From 1244eb6b3712051a15290ee5745997dab2235791 Mon Sep 17 00:00:00 2001 From: joccau Date: Thu, 9 Dec 2021 14:46:43 +0800 Subject: [PATCH] remove cdclog in br Signed-off-by: joccau --- br/cmd/br/restore.go | 35 -- br/pkg/cdclog/buffer.go | 217 --------- br/pkg/cdclog/decoder.go | 299 ------------ br/pkg/cdclog/decoder_test.go | 169 ------- br/pkg/cdclog/puller.go | 184 -------- br/pkg/restore/ingester.go | 607 ------------------------ br/pkg/restore/log_client.go | 761 ------------------------------ br/pkg/restore/log_client_test.go | 126 ----- br/pkg/task/restore_log.go | 143 ------ 9 files changed, 2541 deletions(-) delete mode 100644 br/pkg/cdclog/buffer.go delete mode 100644 br/pkg/cdclog/decoder.go delete mode 100644 br/pkg/cdclog/decoder_test.go delete mode 100644 br/pkg/cdclog/puller.go delete mode 100644 br/pkg/restore/ingester.go delete mode 100644 br/pkg/restore/log_client.go delete mode 100644 br/pkg/restore/log_client_test.go delete mode 100644 br/pkg/task/restore_log.go diff --git a/br/cmd/br/restore.go b/br/cmd/br/restore.go index 753ac2846e81d..c57712ba8bddc 100644 --- a/br/cmd/br/restore.go +++ b/br/cmd/br/restore.go @@ -37,26 +37,6 @@ func runRestoreCommand(command *cobra.Command, cmdName string) error { return nil } -func runLogRestoreCommand(command *cobra.Command) error { - cfg := task.LogRestoreConfig{Config: task.Config{LogProgress: HasLogFile()}} - if err := cfg.ParseFromFlags(command.Flags()); err != nil { - command.SilenceUsage = false - return errors.Trace(err) - } - - ctx := GetDefaultContext() - if cfg.EnableOpenTracing { - var store *appdash.MemoryStore - ctx, store = trace.TracerStartSpan(ctx) - defer trace.TracerFinishSpan(ctx, store) - } - if err := task.RunLogRestore(GetDefaultContext(), tidbGlue, &cfg); err != nil { - log.Error("failed to restore", zap.Error(err)) - return errors.Trace(err) - } - return nil -} - func runRestoreRawCommand(command *cobra.Command, cmdName string) error { cfg := task.RestoreRawConfig{ RawKvConfig: task.RawKvConfig{Config: task.Config{LogProgress: HasLogFile()}}, @@ -102,7 +82,6 @@ func NewRestoreCommand() *cobra.Command { newFullRestoreCommand(), newDBRestoreCommand(), newTableRestoreCommand(), - newLogRestoreCommand(), newRawRestoreCommand(), ) task.DefineRestoreFlags(command.PersistentFlags()) @@ -149,20 +128,6 @@ func newTableRestoreCommand() *cobra.Command { return command } -func newLogRestoreCommand() *cobra.Command { - command := &cobra.Command{ - Use: "cdclog", - Short: "(experimental) restore data from cdc log backup", - Args: cobra.NoArgs, - RunE: func(cmd *cobra.Command, _ []string) error { - return runLogRestoreCommand(cmd) - }, - } - task.DefineFilterFlags(command, filterOutSysAndMemTables) - task.DefineLogRestoreFlags(command) - return command -} - func newRawRestoreCommand() *cobra.Command { command := &cobra.Command{ Use: "raw", diff --git a/br/pkg/cdclog/buffer.go b/br/pkg/cdclog/buffer.go deleted file mode 100644 index 7cfe2ea638d94..0000000000000 --- a/br/pkg/cdclog/buffer.go +++ /dev/null @@ -1,217 +0,0 @@ -// Copyright 2020 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package cdclog - -import ( - "time" - - "github.com/pingcap/errors" - "github.com/pingcap/log" - "github.com/pingcap/tidb/br/pkg/kv" - "github.com/pingcap/tidb/meta/autoid" - "github.com/pingcap/tidb/table" - "github.com/pingcap/tidb/types" - "go.uber.org/zap" -) - -// TableBuffer represents the kv buffer of this table. -// we restore one tableBuffer in one goroutine. -// this is the concurrent unit of log restore. -type TableBuffer struct { - KvPairs []kv.Row - count int - size int64 - - KvEncoder kv.Encoder - tableInfo table.Table - allocator autoid.Allocators - - flushKVSize int64 - flushKVPairs int - - colNames []string - colPerm []int -} - -func newKVEncoder(allocators autoid.Allocators, tbl table.Table) (kv.Encoder, error) { - encTable, err := table.TableFromMeta(allocators, tbl.Meta()) - if err != nil { - return nil, errors.Trace(err) - } - return kv.NewTableKVEncoder(encTable, &kv.SessionOptions{ - Timestamp: time.Now().Unix(), - // TODO get the version from TiDB cluster - // currently TiDB only support v1 and v2, and since 4.0 - // the default RowFormatVersion is 2, so I think - // we can implement the row version retrieve from cluster in the future - // when TiDB decide to support v3 RowFormatVersion. - RowFormatVersion: "2", - }), nil -} - -// NewTableBuffer creates TableBuffer. -func NewTableBuffer(tbl table.Table, allocators autoid.Allocators, flushKVPairs int, flushKVSize int64) *TableBuffer { - tb := &TableBuffer{ - KvPairs: make([]kv.Row, 0, flushKVPairs), - flushKVPairs: flushKVPairs, - flushKVSize: flushKVSize, - } - if tbl != nil { - tb.ReloadMeta(tbl, allocators) - } - return tb -} - -// ResetTableInfo set tableInfo to nil for next reload. -func (t *TableBuffer) ResetTableInfo() { - t.tableInfo = nil -} - -// TableInfo returns the table info of this buffer. -func (t *TableBuffer) TableInfo() table.Table { - return t.tableInfo -} - -// TableID returns the table id of this buffer. -func (t *TableBuffer) TableID() int64 { - if t.tableInfo != nil { - return t.tableInfo.Meta().ID - } - return 0 -} - -// ReloadMeta reload columns after -// 1. table buffer created. -// 2. every ddl executed. -func (t *TableBuffer) ReloadMeta(tbl table.Table, allocator autoid.Allocators) { - columns := tbl.Meta().Cols() - colNames := make([]string, 0, len(columns)) - colPerm := make([]int, 0, len(columns)+1) - - for i, col := range columns { - colNames = append(colNames, col.Name.String()) - colPerm = append(colPerm, i) - } - if kv.TableHasAutoRowID(tbl.Meta()) { - colPerm = append(colPerm, -1) - } - if t.allocator == nil { - t.allocator = allocator - } - t.tableInfo = tbl - t.colNames = colNames - t.colPerm = colPerm - // reset kv encoder after meta changed - t.KvEncoder = nil -} - -func (t *TableBuffer) translateToDatum(row map[string]Column) ([]types.Datum, error) { - cols := make([]types.Datum, 0, len(row)) - for _, col := range t.colNames { - val, err := row[col].ToDatum() - if err != nil { - return nil, errors.Trace(err) - } - cols = append(cols, val) - } - return cols, nil -} - -func (t *TableBuffer) appendRow( - row map[string]Column, - item *SortItem, - encodeFn func(row []types.Datum, - rowID int64, - columnPermutation []int) (kv.Row, int, error), -) error { - cols, err := t.translateToDatum(row) - if err != nil { - return errors.Trace(err) - } - pair, size, err := encodeFn(cols, item.RowID, t.colPerm) - if err != nil { - return errors.Trace(err) - } - t.KvPairs = append(t.KvPairs, pair) - t.size += int64(size) - t.count++ - return nil -} - -// Append appends the item to this buffer. -func (t *TableBuffer) Append(item *SortItem) error { - var err error - log.Debug("Append item to buffer", - zap.Stringer("table", t.tableInfo.Meta().Name), - zap.Any("item", item), - ) - row := item.Data.(*MessageRow) - - if t.KvEncoder == nil { - // lazy create kv encoder - log.Debug("create kv encoder lazily", - zap.Any("alloc", t.allocator), zap.Any("tbl", t.tableInfo)) - t.KvEncoder, err = newKVEncoder(t.allocator, t.tableInfo) - if err != nil { - return errors.Trace(err) - } - } - - if row.PreColumns != nil { - // remove old keys - log.Debug("process update event", zap.Any("row", row)) - err := t.appendRow(row.PreColumns, item, t.KvEncoder.RemoveRecord) - if err != nil { - return errors.Trace(err) - } - } - if row.Update != nil { - // Add new columns - if row.PreColumns == nil { - log.Debug("process insert event", zap.Any("row", row)) - } - err := t.appendRow(row.Update, item, t.KvEncoder.AddRecord) - if err != nil { - return errors.Trace(err) - } - } - if row.Delete != nil { - // Remove current columns - log.Debug("process delete event", zap.Any("row", row)) - err := t.appendRow(row.Delete, item, t.KvEncoder.RemoveRecord) - if err != nil { - return errors.Trace(err) - } - } - return nil -} - -// ShouldApply tells whether we should flush memory kv buffer to storage. -func (t *TableBuffer) ShouldApply() bool { - // flush when reached flush kv len or flush size - return t.size >= t.flushKVSize || t.count >= t.flushKVPairs -} - -// IsEmpty tells buffer is empty. -func (t *TableBuffer) IsEmpty() bool { - return t.size == 0 -} - -// Clear reset the buffer. -func (t *TableBuffer) Clear() { - t.KvPairs = t.KvPairs[:0] - t.count = 0 - t.size = 0 -} diff --git a/br/pkg/cdclog/decoder.go b/br/pkg/cdclog/decoder.go deleted file mode 100644 index 0334d3ff76f57..0000000000000 --- a/br/pkg/cdclog/decoder.go +++ /dev/null @@ -1,299 +0,0 @@ -// Copyright 2020 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package cdclog - -import ( - "bytes" - "encoding/base64" - "encoding/binary" - "encoding/json" - "strconv" - - "github.com/pingcap/errors" - "github.com/pingcap/log" - berrors "github.com/pingcap/tidb/br/pkg/errors" - timodel "github.com/pingcap/tidb/parser/model" - "github.com/pingcap/tidb/parser/mysql" - "github.com/pingcap/tidb/types" - "go.uber.org/zap" -) - -// ColumnFlagType represents the type of Column. -type ColumnFlagType uint64 - -// ItemType represents the type of SortItem. -type ItemType uint - -const ( - // RowChanged represents dml type. - RowChanged ItemType = 1 << ItemType(iota) - // DDL represents ddl type. - DDL -) - -// TODO let cdc import these flags. -const ( - // BatchVersion1 represents the version of batch format. - BatchVersion1 uint64 = 1 -) - -const ( - // BinaryFlag means the Column charset is binary. - BinaryFlag ColumnFlagType = 1 << ColumnFlagType(iota) - // HandleKeyFlag means the Column is selected as the handle key. - HandleKeyFlag - // GeneratedColumnFlag means the Column is a generated Column. - GeneratedColumnFlag - // PrimaryKeyFlag means the Column is primary key. - PrimaryKeyFlag - // UniqueKeyFlag means the Column is unique key. - UniqueKeyFlag - // MultipleKeyFlag means the Column is multiple key. - MultipleKeyFlag - // NullableFlag means the Column is nullable. - NullableFlag -) - -// Column represents the column data define by cdc. -type Column struct { - Type byte `json:"t"` - - // WhereHandle is deprecated - // WhereHandle is replaced by HandleKey in Flag. - WhereHandle *bool `json:"h,omitempty"` - Flag ColumnFlagType `json:"f"` - Value interface{} `json:"v"` -} - -// ToDatum encode Column to Datum. -func (c Column) ToDatum() (types.Datum, error) { - var ( - val interface{} - err error - ) - - switch c.Type { - case mysql.TypeTiny, mysql.TypeShort, mysql.TypeInt24, mysql.TypeLong, mysql.TypeLonglong, mysql.TypeYear: - val, err = c.Value.(json.Number).Int64() - if err != nil { - return types.Datum{}, errors.Trace(err) - } - case mysql.TypeFloat, mysql.TypeDouble: - val, err = c.Value.(json.Number).Float64() - if err != nil { - return types.Datum{}, errors.Trace(err) - } - default: - val = c.Value - } - return types.NewDatum(val), nil -} - -func formatColumnVal(c Column) Column { - switch c.Type { - case mysql.TypeVarchar, mysql.TypeString: - if s, ok := c.Value.(string); ok { - // according to open protocol https://docs.pingcap.com/tidb/dev/ticdc-open-protocol - // CHAR/BINARY have the same type: 254 - // VARCHAR/VARBINARY have the same type: 15 - // we need to process it by its flag. - if c.Flag&BinaryFlag != 0 { - val, err := strconv.Unquote("\"" + s + "\"") - if err != nil { - log.Panic("invalid Column value, please report a bug", zap.Any("col", c), zap.Error(err)) - } - c.Value = val - } - } - case mysql.TypeTinyBlob, mysql.TypeMediumBlob, - mysql.TypeLongBlob, mysql.TypeBlob: - if s, ok := c.Value.(string); ok { - var err error - c.Value, err = base64.StdEncoding.DecodeString(s) - if err != nil { - log.Panic("invalid Column value, please report a bug", zap.Any("col", c), zap.Error(err)) - } - } - case mysql.TypeBit: - if s, ok := c.Value.(json.Number); ok { - intNum, err := s.Int64() - if err != nil { - log.Panic("invalid Column value, please report a bug", zap.Any("col", c), zap.Error(err)) - } - c.Value = uint64(intNum) - } - } - return c -} - -type messageKey struct { - TS uint64 `json:"ts"` - Schema string `json:"scm,omitempty"` - Table string `json:"tbl,omitempty"` - RowID int64 `json:"rid,omitempty"` - Partition *int64 `json:"ptn,omitempty"` -} - -// Encode the messageKey. -func (m *messageKey) Encode() ([]byte, error) { - return json.Marshal(m) -} - -// Decode the messageKey. -func (m *messageKey) Decode(data []byte) error { - return json.Unmarshal(data, m) -} - -// MessageDDL represents the ddl changes. -type MessageDDL struct { - Query string `json:"q"` - Type timodel.ActionType `json:"t"` -} - -// Encode the DDL message. -func (m *MessageDDL) Encode() ([]byte, error) { - return json.Marshal(m) -} - -// Decode the DDL message. -func (m *MessageDDL) Decode(data []byte) error { - return json.Unmarshal(data, m) -} - -// MessageRow represents the row changes in same commit ts. -type MessageRow struct { - Update map[string]Column `json:"u,omitempty"` - PreColumns map[string]Column `json:"p,omitempty"` - Delete map[string]Column `json:"d,omitempty"` -} - -// Encode the Row message. -func (m *MessageRow) Encode() ([]byte, error) { - return json.Marshal(m) -} - -// Decode the Row message. -func (m *MessageRow) Decode(data []byte) error { - decoder := json.NewDecoder(bytes.NewReader(data)) - decoder.UseNumber() - err := decoder.Decode(m) - if err != nil { - return errors.Trace(err) - } - for colName, column := range m.Update { - m.Update[colName] = formatColumnVal(column) - } - for colName, column := range m.Delete { - m.Delete[colName] = formatColumnVal(column) - } - for colName, column := range m.PreColumns { - m.PreColumns[colName] = formatColumnVal(column) - } - return nil -} - -// SortItem represents a DDL item or Row changed item. -type SortItem struct { - ItemType ItemType - Data interface{} - Schema string - Table string - RowID int64 - TS uint64 -} - -// LessThan return whether it has smaller commit ts than other item. -func (s *SortItem) LessThan(other *SortItem) bool { - if other != nil { - return s.TS < other.TS - } - return true -} - -// JSONEventBatchMixedDecoder decodes the byte of a batch into the original messages. -type JSONEventBatchMixedDecoder struct { - mixedBytes []byte -} - -func (b *JSONEventBatchMixedDecoder) decodeNextKey() (*messageKey, error) { - keyLen := binary.BigEndian.Uint64(b.mixedBytes[:8]) - key := b.mixedBytes[8 : keyLen+8] - // drop value bytes - msgKey := new(messageKey) - err := msgKey.Decode(key) - if err != nil { - return nil, errors.Trace(err) - } - b.mixedBytes = b.mixedBytes[keyLen+8:] - return msgKey, nil -} - -// NextEvent return next item depends on type. -func (b *JSONEventBatchMixedDecoder) NextEvent(itemType ItemType) (*SortItem, error) { - if !b.HasNext() { - return nil, nil - } - nextKey, err := b.decodeNextKey() - if err != nil { - return nil, errors.Trace(err) - } - - valueLen := binary.BigEndian.Uint64(b.mixedBytes[:8]) - value := b.mixedBytes[8 : valueLen+8] - b.mixedBytes = b.mixedBytes[valueLen+8:] - - var m interface{} - if itemType == DDL { - m = new(MessageDDL) - if err := m.(*MessageDDL).Decode(value); err != nil { - return nil, errors.Trace(err) - } - } else if itemType == RowChanged { - m = new(MessageRow) - if err := m.(*MessageRow).Decode(value); err != nil { - return nil, errors.Trace(err) - } - } - - item := &SortItem{ - ItemType: itemType, - Data: m, - Schema: nextKey.Schema, - Table: nextKey.Table, - TS: nextKey.TS, - RowID: nextKey.RowID, - } - return item, nil -} - -// HasNext represents whether it has next kv to decode. -func (b *JSONEventBatchMixedDecoder) HasNext() bool { - return len(b.mixedBytes) > 0 -} - -// NewJSONEventBatchDecoder creates a new JSONEventBatchDecoder. -func NewJSONEventBatchDecoder(data []byte) (*JSONEventBatchMixedDecoder, error) { - if len(data) == 0 { - return nil, nil - } - version := binary.BigEndian.Uint64(data[:8]) - data = data[8:] - if version != BatchVersion1 { - return nil, errors.Annotate(berrors.ErrPiTRInvalidCDCLogFormat, "unexpected key format version") - } - return &JSONEventBatchMixedDecoder{ - mixedBytes: data, - }, nil -} diff --git a/br/pkg/cdclog/decoder_test.go b/br/pkg/cdclog/decoder_test.go deleted file mode 100644 index 5af40fd6f0f4a..0000000000000 --- a/br/pkg/cdclog/decoder_test.go +++ /dev/null @@ -1,169 +0,0 @@ -// Copyright 2020 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package cdclog - -import ( - "encoding/binary" - "encoding/json" - "testing" - - "github.com/pingcap/check" - "github.com/pingcap/tidb/parser/mysql" -) - -func Test(t *testing.T) { check.TestingT(t) } - -type batchSuite struct { - ddlEvents []*MessageDDL - ddlDecodeItem []*SortItem - - rowEvents []*MessageRow - rowDecodeItem []*SortItem -} - -var updateCols = map[string]Column{ - // json.Number - "id": {Type: 1, Value: "1"}, - "name": {Type: 2, Value: "test"}, -} - -var _ = check.Suite(&batchSuite{ - ddlEvents: []*MessageDDL{ - {"drop table event", 4}, - {"create table event", 3}, - {"drop table event", 5}, - }, - ddlDecodeItem: []*SortItem{ - {ItemType: DDL, Schema: "test", Table: "event", TS: 1, Data: MessageDDL{"drop table event", 4}}, - {ItemType: DDL, Schema: "test", Table: "event", TS: 1, Data: MessageDDL{"create table event", 3}}, - {ItemType: DDL, Schema: "test", Table: "event", TS: 1, Data: MessageDDL{"drop table event", 5}}, - }, - - rowEvents: []*MessageRow{ - {Update: updateCols}, - {PreColumns: updateCols}, - {Delete: updateCols}, - }, - rowDecodeItem: []*SortItem{ - {ItemType: RowChanged, Schema: "test", Table: "event", TS: 1, RowID: 0, Data: MessageRow{Update: updateCols}}, - {ItemType: RowChanged, Schema: "test", Table: "event", TS: 1, RowID: 1, Data: MessageRow{PreColumns: updateCols}}, - {ItemType: RowChanged, Schema: "test", Table: "event", TS: 1, RowID: 2, Data: MessageRow{Delete: updateCols}}, - }, -}) - -func buildEncodeRowData(events []*MessageRow) []byte { - var versionByte [8]byte - binary.BigEndian.PutUint64(versionByte[:], BatchVersion1) - data := append(make([]byte, 0), versionByte[:]...) - key := messageKey{ - TS: 1, - Schema: "test", - Table: "event", - } - var LenByte [8]byte - for i := 0; i < len(events); i++ { - key.RowID = int64(i) - keyBytes, _ := key.Encode() - binary.BigEndian.PutUint64(LenByte[:], uint64(len(keyBytes))) - data = append(data, LenByte[:]...) - data = append(data, keyBytes...) - eventBytes, _ := events[i].Encode() - binary.BigEndian.PutUint64(LenByte[:], uint64(len(eventBytes))) - data = append(data, LenByte[:]...) - data = append(data, eventBytes...) - } - return data -} - -func buildEncodeDDLData(events []*MessageDDL) []byte { - var versionByte [8]byte - binary.BigEndian.PutUint64(versionByte[:], BatchVersion1) - data := append(make([]byte, 0), versionByte[:]...) - key := messageKey{ - TS: 1, - Schema: "test", - Table: "event", - } - var LenByte [8]byte - for i := 0; i < len(events); i++ { - keyBytes, _ := key.Encode() - binary.BigEndian.PutUint64(LenByte[:], uint64(len(keyBytes))) - data = append(data, LenByte[:]...) - data = append(data, keyBytes...) - eventBytes, _ := events[i].Encode() - binary.BigEndian.PutUint64(LenByte[:], uint64(len(eventBytes))) - data = append(data, LenByte[:]...) - data = append(data, eventBytes...) - } - return data -} - -func (s *batchSuite) TestDecoder(c *check.C) { - var item *SortItem - - data := buildEncodeDDLData(s.ddlEvents) - decoder, err := NewJSONEventBatchDecoder(data) - c.Assert(err, check.IsNil) - index := 0 - for { - hasNext := decoder.HasNext() - if !hasNext { - break - } - item, err = decoder.NextEvent(DDL) - c.Assert(err, check.IsNil) - c.Assert(item.Data.(*MessageDDL), check.DeepEquals, s.ddlEvents[index]) - index++ - } - - data = buildEncodeRowData(s.rowEvents) - decoder, err = NewJSONEventBatchDecoder(data) - c.Assert(err, check.IsNil) - index = 0 - for { - hasNext := decoder.HasNext() - if !hasNext { - break - } - item, err = decoder.NextEvent(RowChanged) - c.Assert(err, check.IsNil) - c.Assert(item.Data.(*MessageRow), check.DeepEquals, s.rowEvents[index]) - c.Assert(item.RowID, check.Equals, int64(index)) - index++ - } -} - -func (s *batchSuite) TestColumn(c *check.C) { - // test varbinary columns (same type with varchar 15) - col1 := Column{Type: mysql.TypeVarchar, Flag: BinaryFlag, Value: "\\x00\\x01"} - col1 = formatColumnVal(col1) - dat, err := col1.ToDatum() - c.Assert(err, check.IsNil) - c.Assert(dat.GetString(), check.Equals, "\x00\x01") - - // test binary columns (same type with varchar 254) - col2 := Column{Type: mysql.TypeString, Flag: BinaryFlag, Value: "test\\ttest"} - col2 = formatColumnVal(col2) - dat, err = col2.ToDatum() - c.Assert(err, check.IsNil) - c.Assert(dat.GetString(), check.Equals, "test\ttest") - - // test year columns - val := json.Number("2020") - colYear := Column{Type: mysql.TypeYear, Value: val} - dat, err = colYear.ToDatum() - c.Assert(err, check.IsNil) - c.Assert(dat.GetInt64(), check.Equals, int64(2020)) -} diff --git a/br/pkg/cdclog/puller.go b/br/pkg/cdclog/puller.go deleted file mode 100644 index 1c823c8de6c44..0000000000000 --- a/br/pkg/cdclog/puller.go +++ /dev/null @@ -1,184 +0,0 @@ -// Copyright 2020 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package cdclog - -import ( - "context" - - "github.com/pingcap/errors" - "github.com/pingcap/log" - "github.com/pingcap/tidb/br/pkg/storage" -) - -// EventPuller pulls next event in ts order. -type EventPuller struct { - ddlDecoder *JSONEventBatchMixedDecoder - rowChangedDecoder *JSONEventBatchMixedDecoder - currentDDLItem *SortItem - currentRowChangedItem *SortItem - - schema string - table string - - storage storage.ExternalStorage - ddlFiles []string - rowChangedFiles []string - - ddlFileIndex int - rowChangedFileIndex int -} - -// NewEventPuller create eventPuller by given log files, we assume files come in ts order. -func NewEventPuller( - ctx context.Context, - schema string, - table string, - ddlFiles []string, - rowChangedFiles []string, - storage storage.ExternalStorage) (*EventPuller, error) { - var ( - ddlDecoder *JSONEventBatchMixedDecoder - ddlFileIndex int - rowChangedDecoder *JSONEventBatchMixedDecoder - rowFileIndex int - ) - if len(ddlFiles) == 0 { - log.Info("There is no ddl file to restore") - } else { - data, err := storage.ReadFile(ctx, ddlFiles[0]) - if err != nil { - return nil, errors.Trace(err) - } - if len(data) != 0 { - ddlFileIndex++ - ddlDecoder, err = NewJSONEventBatchDecoder(data) - if err != nil { - return nil, errors.Trace(err) - } - } - } - - if len(rowChangedFiles) == 0 { - log.Info("There is no row changed file to restore") - } else { - data, err := storage.ReadFile(ctx, rowChangedFiles[0]) - if err != nil { - return nil, errors.Trace(err) - } - if len(data) != 0 { - rowFileIndex++ - rowChangedDecoder, err = NewJSONEventBatchDecoder(data) - if err != nil { - return nil, errors.Trace(err) - } - } - } - - return &EventPuller{ - schema: schema, - table: table, - - ddlDecoder: ddlDecoder, - rowChangedDecoder: rowChangedDecoder, - - ddlFiles: ddlFiles, - rowChangedFiles: rowChangedFiles, - ddlFileIndex: ddlFileIndex, - rowChangedFileIndex: rowFileIndex, - - storage: storage, - }, nil -} - -// PullOneEvent pulls one event in ts order. -// The Next event which can be DDL item or Row changed Item depends on next commit ts. -func (e *EventPuller) PullOneEvent(ctx context.Context) (*SortItem, error) { - var ( - err error - data []byte - ) - // ddl exists - if e.ddlDecoder != nil { - // current file end, read next file if next file exists - if !e.ddlDecoder.HasNext() && e.ddlFileIndex < len(e.ddlFiles) { - path := e.ddlFiles[e.ddlFileIndex] - data, err = e.storage.ReadFile(ctx, path) - if err != nil { - return nil, errors.Trace(err) - } - if len(data) > 0 { - e.ddlFileIndex++ - e.ddlDecoder, err = NewJSONEventBatchDecoder(data) - if err != nil { - return nil, errors.Trace(err) - } - } - } - // set current DDL item first - if e.currentDDLItem == nil { - e.currentDDLItem, err = e.ddlDecoder.NextEvent(DDL) - if err != nil { - return nil, errors.Trace(err) - } - } - } - // dml exists - if e.rowChangedDecoder != nil { - // current file end, read next file if next file exists - if !e.rowChangedDecoder.HasNext() && e.rowChangedFileIndex < len(e.rowChangedFiles) { - path := e.rowChangedFiles[e.rowChangedFileIndex] - data, err = e.storage.ReadFile(ctx, path) - if err != nil { - return nil, errors.Trace(err) - } - if len(data) != 0 { - e.rowChangedFileIndex++ - e.rowChangedDecoder, err = NewJSONEventBatchDecoder(data) - if err != nil { - return nil, errors.Trace(err) - } - } - } - if e.currentRowChangedItem == nil { - e.currentRowChangedItem, err = e.rowChangedDecoder.NextEvent(RowChanged) - if err != nil { - return nil, errors.Trace(err) - } - } - } - - var returnItem *SortItem - switch { - case e.currentDDLItem != nil: - if e.currentDDLItem.LessThan(e.currentRowChangedItem) { - returnItem = e.currentDDLItem - e.currentDDLItem, err = e.ddlDecoder.NextEvent(DDL) - if err != nil { - return nil, errors.Trace(err) - } - break - } - fallthrough - case e.currentRowChangedItem != nil: - returnItem = e.currentRowChangedItem - e.currentRowChangedItem, err = e.rowChangedDecoder.NextEvent(RowChanged) - if err != nil { - return nil, errors.Trace(err) - } - default: - log.Info("puller finished") - } - return returnItem, nil -} diff --git a/br/pkg/restore/ingester.go b/br/pkg/restore/ingester.go deleted file mode 100644 index 28e3940ee2bf5..0000000000000 --- a/br/pkg/restore/ingester.go +++ /dev/null @@ -1,607 +0,0 @@ -// Copyright 2020 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package restore - -import ( - "bytes" - "context" - "crypto/tls" - "strings" - "sync" - "time" - - "github.com/google/uuid" - "github.com/pingcap/errors" - "github.com/pingcap/failpoint" - "github.com/pingcap/kvproto/pkg/errorpb" - sst "github.com/pingcap/kvproto/pkg/import_sstpb" - "github.com/pingcap/kvproto/pkg/kvrpcpb" - "github.com/pingcap/kvproto/pkg/metapb" - "github.com/pingcap/log" - "github.com/pingcap/tidb/br/pkg/conn" - berrors "github.com/pingcap/tidb/br/pkg/errors" - "github.com/pingcap/tidb/br/pkg/kv" - "github.com/pingcap/tidb/br/pkg/logutil" - "github.com/pingcap/tidb/br/pkg/membuf" - "github.com/pingcap/tidb/br/pkg/utils" - "github.com/tikv/pd/pkg/codec" - "go.uber.org/zap" - "google.golang.org/grpc" - "google.golang.org/grpc/backoff" - "google.golang.org/grpc/credentials" - "google.golang.org/grpc/keepalive" -) - -const ( - dialTimeout = 5 * time.Second - - gRPCKeepAliveTime = 10 * time.Second - gRPCKeepAliveTimeout = 3 * time.Second - - // See: https://github.com/tikv/tikv/blob/e030a0aae9622f3774df89c62f21b2171a72a69e/etc/config-template.toml#L360 - regionMaxKeyCount = 1440000 - - defaultSplitSize = 96 * 1024 * 1024 -) - -type retryType int - -const ( - retryNone retryType = iota - retryWrite - retryIngest -) - -type gRPCConns struct { - mu sync.Mutex - conns map[uint64]*conn.Pool - tcpConcurrency int -} - -func (conns *gRPCConns) Close() { - conns.mu.Lock() - defer conns.mu.Unlock() - - for _, cp := range conns.conns { - cp.Close() - } -} - -// Ingester writes and ingests kv to TiKV. -// which used for both BR log restore and Lightning local backend. -type Ingester struct { - // commit ts appends to key in tikv - TS uint64 - - tlsConf *tls.Config - conns gRPCConns - - splitCli SplitClient - WorkerPool *utils.WorkerPool - - batchWriteKVPairs int - regionSplitSize int64 -} - -// NewIngester creates Ingester. -func NewIngester( - splitCli SplitClient, cfg concurrencyCfg, commitTS uint64, tlsConf *tls.Config, -) *Ingester { - workerPool := utils.NewWorkerPool(cfg.IngestConcurrency, "ingest worker") - return &Ingester{ - tlsConf: tlsConf, - conns: gRPCConns{ - tcpConcurrency: cfg.TCPConcurrency, - conns: make(map[uint64]*conn.Pool), - }, - splitCli: splitCli, - WorkerPool: workerPool, - batchWriteKVPairs: cfg.BatchWriteKVPairs, - regionSplitSize: defaultSplitSize, - TS: commitTS, - } -} - -func (i *Ingester) makeConn(ctx context.Context, storeID uint64) (*grpc.ClientConn, error) { - store, err := i.splitCli.GetStore(ctx, storeID) - if err != nil { - return nil, errors.Trace(err) - } - opt := grpc.WithInsecure() - if i.tlsConf != nil { - opt = grpc.WithTransportCredentials(credentials.NewTLS(i.tlsConf)) - } - ctx, cancel := context.WithTimeout(ctx, dialTimeout) - - bfConf := backoff.DefaultConfig - bfConf.MaxDelay = gRPCBackOffMaxDelay - // we should use peer address for tiflash. for tikv, peer address is empty - addr := store.GetPeerAddress() - if addr == "" { - addr = store.GetAddress() - } - grpcConn, err := grpc.DialContext( - ctx, - addr, - opt, - grpc.WithBlock(), - grpc.FailOnNonTempDialError(true), - grpc.WithConnectParams(grpc.ConnectParams{Backoff: bfConf}), - grpc.WithKeepaliveParams(keepalive.ClientParameters{ - Time: gRPCKeepAliveTime, - Timeout: gRPCKeepAliveTimeout, - PermitWithoutStream: true, - }), - ) - cancel() - if err != nil { - return nil, errors.Trace(err) - } - return grpcConn, nil -} - -// write [start, end) kv in to tikv. -func (i *Ingester) writeAndIngestByRange( - ctxt context.Context, - iterProducer kv.IterProducer, - start []byte, - end []byte, - remainRanges *syncdRanges, -) error { - select { - case <-ctxt.Done(): - return ctxt.Err() - default: - } - iter := iterProducer.Produce(start, end) - iter.First() - pairStart := append([]byte{}, iter.Key()...) - iter.Last() - pairEnd := append([]byte{}, iter.Key()...) - if bytes.Compare(pairStart, pairEnd) > 0 { - log.Debug("There is no pairs in iterator", logutil.Key("start", start), - logutil.Key("end", end), logutil.Key("pairStart", pairStart), logutil.Key("pairEnd", pairEnd)) - return nil - } - var regions []*RegionInfo - var err error - ctx, cancel := context.WithCancel(ctxt) - defer cancel() - -WriteAndIngest: - for retry := 0; retry < maxRetryTimes; { - if retry != 0 { - select { - case <-time.After(time.Second): - case <-ctx.Done(): - return ctx.Err() - } - } - startKey := codec.EncodeBytes(pairStart) - endKey := codec.EncodeBytes(kv.NextKey(pairEnd)) - regions, err = PaginateScanRegion(ctx, i.splitCli, startKey, endKey, 128) - if err != nil || len(regions) == 0 { - log.Warn("scan region failed", zap.Error(err), zap.Int("region_len", len(regions)), - logutil.Key("startKey", startKey), logutil.Key("endKey", endKey), zap.Int("retry", retry)) - retry++ - continue - } - - for _, region := range regions { - log.Debug("get region", zap.Int("retry", retry), logutil.Key("startKey", startKey), - logutil.Key("endKey", endKey), logutil.Region(region.Region)) - w := i.WorkerPool.ApplyWorker() - var rg *Range - rg, err = i.writeAndIngestPairs(ctx, iter, region, pairStart, pairEnd) - i.WorkerPool.RecycleWorker(w) - if err != nil { - _, regionStart, _ := codec.DecodeBytes(region.Region.StartKey) - // if we have at least succeeded one region, retry without increasing the retry count - if bytes.Compare(regionStart, pairStart) > 0 { - pairStart = regionStart - } else { - retry++ - } - log.Info("retry write and ingest kv pairs", logutil.Key("startKey", pairStart), - logutil.Key("endKey", pairEnd), zap.Error(err), zap.Int("retry", retry)) - continue WriteAndIngest - } - if rg != nil { - remainRanges.add(*rg) - } - } - break - } - return err -} - -func (i *Ingester) writeAndIngestPairs( - ctx context.Context, - iter kv.Iter, - region *RegionInfo, - start, end []byte, -) (*Range, error) { - var err error - var remainRange *Range -loopWrite: - for retry := 0; retry < maxRetryTimes; retry++ { - select { - case <-ctx.Done(): - return remainRange, ctx.Err() - default: - } - var metas []*sst.SSTMeta - metas, remainRange, err = i.writeToTiKV(ctx, iter, region, start, end) - if err != nil { - log.Warn("write to tikv failed", zap.Error(err)) - return nil, err - } - - for _, meta := range metas { - errCnt := 0 - for errCnt < maxRetryTimes { - log.Debug("ingest meta", zap.Reflect("meta", meta)) - var resp *sst.IngestResponse - resp, err = i.ingest(ctx, meta, region) - if err != nil { - log.Warn("ingest failed", zap.Error(err), logutil.SSTMeta(meta), - zap.Reflect("region", region)) - errCnt++ - continue - } - failpoint.Inject("FailIngestMeta", func(val failpoint.Value) { - switch val.(string) { - case "notleader": - resp.Error.NotLeader = &errorpb.NotLeader{ - RegionId: region.Region.Id, Leader: region.Leader, - } - case "epochnotmatch": - resp.Error.EpochNotMatch = &errorpb.EpochNotMatch{ - CurrentRegions: []*metapb.Region{region.Region}, - } - } - }) - var retryTy retryType - var newRegion *RegionInfo - retryTy, newRegion, err = i.isIngestRetryable(ctx, resp, region, meta) - if err == nil { - // ingest next meta - break - } - switch retryTy { - case retryNone: - log.Warn("ingest failed and do not retry", zap.Error(err), logutil.SSTMeta(meta), - zap.Reflect("region", region)) - // met non-retryable error retry whole Write procedure - return remainRange, err - case retryWrite: - region = newRegion - continue loopWrite - case retryIngest: - region = newRegion - continue - } - } - } - - if err != nil { - log.Warn("write and ingest region, will retry import full range", zap.Error(err), - logutil.Region(region.Region), logutil.Key("start", start), logutil.Key("end", end)) - } - return remainRange, errors.Trace(err) - } - - return remainRange, errors.Trace(err) -} - -// writeToTiKV writer engine key-value pairs to tikv and return the sst meta generated by tikv. -// we don't need to do cleanup for the pairs written to tikv if encounters an error, -// tikv will takes the responsibility to do so. -func (i *Ingester) writeToTiKV( - ctx context.Context, - iter kv.Iter, - region *RegionInfo, - start, end []byte, -) ([]*sst.SSTMeta, *Range, error) { - begin := time.Now() - regionRange := intersectRange(region.Region, Range{Start: start, End: end}) - - iter.Seek(regionRange.Start) - firstKey := codec.EncodeBytes(iter.Key()) - var lastKey []byte - if iter.Seek(regionRange.End) { - lastKey = codec.EncodeBytes(iter.Key()) - } else { - iter.Last() - log.Info("region range's end key not in iter, shouldn't happen", - zap.Any("region range", regionRange), logutil.Key("iter last", iter.Key())) - lastKey = codec.EncodeBytes(kv.NextKey(iter.Key())) - } - - if bytes.Compare(firstKey, lastKey) > 0 { - log.Info("keys within region is empty, skip ingest", logutil.Key("start", start), - logutil.Key("regionStart", region.Region.StartKey), logutil.Key("end", end), - logutil.Key("regionEnd", region.Region.EndKey)) - return nil, nil, nil - } - - u := uuid.New() - meta := &sst.SSTMeta{ - Uuid: u[:], - RegionId: region.Region.GetId(), - RegionEpoch: region.Region.GetRegionEpoch(), - Range: &sst.Range{ - Start: firstKey, - End: lastKey, - }, - } - - leaderID := region.Leader.GetId() - clients := make([]sst.ImportSST_WriteClient, 0, len(region.Region.GetPeers())) - requests := make([]*sst.WriteRequest, 0, len(region.Region.GetPeers())) - for _, peer := range region.Region.GetPeers() { - cli, err := i.getImportClient(ctx, peer) - if err != nil { - return nil, nil, err - } - - wstream, err := cli.Write(ctx) - if err != nil { - return nil, nil, errors.Trace(err) - } - - // Bind uuid for this write request - req := &sst.WriteRequest{ - Chunk: &sst.WriteRequest_Meta{ - Meta: meta, - }, - } - if err = wstream.Send(req); err != nil { - return nil, nil, errors.Trace(err) - } - req.Chunk = &sst.WriteRequest_Batch{ - Batch: &sst.WriteBatch{ - CommitTs: i.TS, - }, - } - clients = append(clients, wstream) - requests = append(requests, req) - } - - bytesBuf := membuf.NewBuffer() - defer bytesBuf.Destroy() - pairs := make([]*sst.Pair, 0, i.batchWriteKVPairs) - count := 0 - size := int64(0) - totalCount := 0 - firstLoop := true - regionMaxSize := i.regionSplitSize * 4 / 3 - - for iter.Seek(regionRange.Start); iter.Valid() && bytes.Compare(iter.Key(), regionRange.End) <= 0; iter.Next() { - size += int64(len(iter.Key()) + len(iter.Value())) - // here we reuse the `*sst.Pair`s to optimize object allocation - if firstLoop { - pair := &sst.Pair{ - Key: bytesBuf.AddBytes(iter.Key()), - Value: bytesBuf.AddBytes(iter.Value()), - Op: iter.OpType(), - } - pairs = append(pairs, pair) - } else { - pairs[count].Key = bytesBuf.AddBytes(iter.Key()) - pairs[count].Value = bytesBuf.AddBytes(iter.Value()) - } - count++ - totalCount++ - - if count >= i.batchWriteKVPairs { - for i := range clients { - requests[i].Chunk.(*sst.WriteRequest_Batch).Batch.Pairs = pairs[:count] - if err := clients[i].Send(requests[i]); err != nil { - return nil, nil, errors.Trace(err) - } - } - count = 0 - bytesBuf.Reset() - firstLoop = false - } - if size >= regionMaxSize || totalCount >= regionMaxKeyCount { - break - } - } - - if count > 0 { - for i := range clients { - requests[i].Chunk.(*sst.WriteRequest_Batch).Batch.Pairs = pairs[:count] - if err := clients[i].Send(requests[i]); err != nil { - return nil, nil, errors.Trace(err) - } - } - } - - if iter.Error() != nil { - return nil, nil, errors.Trace(iter.Error()) - } - - var leaderPeerMetas []*sst.SSTMeta - for i, wStream := range clients { - if resp, closeErr := wStream.CloseAndRecv(); closeErr != nil { - return nil, nil, errors.Trace(closeErr) - } else if leaderID == region.Region.Peers[i].GetId() { - leaderPeerMetas = resp.Metas - log.Debug("get metas after write kv stream to tikv", zap.Reflect("metas", leaderPeerMetas)) - } - } - - // if there is not leader currently, we should directly return an error - if leaderPeerMetas == nil { - log.Warn("write to tikv no leader", zap.Reflect("region", region), - zap.Uint64("leader_id", leaderID), zap.Reflect("meta", meta), - zap.Int("kv_pairs", totalCount), zap.Int64("total_bytes", size)) - return nil, nil, errors.Annotatef(berrors.ErrPDLeaderNotFound, "write to tikv with no leader returned, region '%d', leader: %d", - region.Region.Id, leaderID) - } - - log.Debug("write to kv", zap.Reflect("region", region), zap.Uint64("leader", leaderID), - zap.Reflect("meta", meta), zap.Reflect("return metas", leaderPeerMetas), - zap.Int("kv_pairs", totalCount), zap.Int64("total_bytes", size), - zap.Int64("buf_size", bytesBuf.TotalSize()), - zap.Stringer("takeTime", time.Since(begin))) - - var remainRange *Range - if iter.Valid() && iter.Next() { - firstKey := append([]byte{}, iter.Key()...) - remainRange = &Range{Start: firstKey, End: regionRange.End} - log.Info("write to tikv partial finish", zap.Int("count", totalCount), - zap.Int64("size", size), zap.Binary("startKey", regionRange.Start), zap.Binary("endKey", regionRange.End), - zap.Binary("remainStart", remainRange.Start), zap.Binary("remainEnd", remainRange.End), - zap.Reflect("region", region)) - } - - return leaderPeerMetas, remainRange, nil -} - -func (i *Ingester) ingest(ctx context.Context, meta *sst.SSTMeta, region *RegionInfo) (*sst.IngestResponse, error) { - leader := region.Leader - if leader == nil { - leader = region.Region.GetPeers()[0] - } - - cli, err := i.getImportClient(ctx, leader) - if err != nil { - return nil, err - } - reqCtx := &kvrpcpb.Context{ - RegionId: region.Region.GetId(), - RegionEpoch: region.Region.GetRegionEpoch(), - Peer: leader, - } - - req := &sst.IngestRequest{ - Context: reqCtx, - Sst: meta, - } - resp, err := cli.Ingest(ctx, req) - if err != nil { - return nil, errors.Trace(err) - } - return resp, nil -} - -func (i *Ingester) getImportClient(ctx context.Context, peer *metapb.Peer) (sst.ImportSSTClient, error) { - i.conns.mu.Lock() - defer i.conns.mu.Unlock() - - conn, err := i.getGrpcConnLocked(ctx, peer.GetStoreId()) - if err != nil { - return nil, errors.Trace(err) - } - return sst.NewImportSSTClient(conn), nil -} - -func (i *Ingester) getGrpcConnLocked(ctx context.Context, storeID uint64) (*grpc.ClientConn, error) { - if _, ok := i.conns.conns[storeID]; !ok { - i.conns.conns[storeID] = conn.NewConnPool(i.conns.tcpConcurrency, func(ctx context.Context) (*grpc.ClientConn, error) { - return i.makeConn(ctx, storeID) - }) - } - return i.conns.conns[storeID].Get(ctx) -} - -func (i *Ingester) isIngestRetryable( - ctx context.Context, - resp *sst.IngestResponse, - region *RegionInfo, - meta *sst.SSTMeta, -) (retryType, *RegionInfo, error) { - if resp.GetError() == nil { - return retryNone, nil, nil - } - - getRegion := func() (*RegionInfo, error) { - for retry := 0; ; retry++ { - newRegion, err := i.splitCli.GetRegion(ctx, region.Region.GetStartKey()) - if err != nil { - return nil, errors.Trace(err) - } - if newRegion != nil { - return newRegion, nil - } - log.Warn("get region by key return nil, will retry", zap.Reflect("region", region), - zap.Int("retry", retry)) - select { - case <-ctx.Done(): - return nil, ctx.Err() - case <-time.After(time.Second): - } - } - } - - var newRegion *RegionInfo - var err error - switch errPb := resp.GetError(); { - case errPb.NotLeader != nil: - if newLeader := errPb.GetNotLeader().GetLeader(); newLeader != nil { - newRegion = &RegionInfo{ - Leader: newLeader, - Region: region.Region, - } - } else { - newRegion, err = getRegion() - if err != nil { - return retryNone, nil, errors.Trace(err) - } - } - return retryIngest, newRegion, errors.Annotatef(berrors.ErrKVNotLeader, "not leader: %s", errPb.GetMessage()) - case errPb.EpochNotMatch != nil: - if currentRegions := errPb.GetEpochNotMatch().GetCurrentRegions(); currentRegions != nil { - var currentRegion *metapb.Region - for _, r := range currentRegions { - if insideRegion(r, meta) { - currentRegion = r - break - } - } - if currentRegion != nil { - var newLeader *metapb.Peer - for _, p := range currentRegion.Peers { - if p.GetStoreId() == region.Leader.GetStoreId() { - newLeader = p - break - } - } - if newLeader != nil { - newRegion = &RegionInfo{ - Leader: newLeader, - Region: currentRegion, - } - } - } - } - retryTy := retryNone - if newRegion != nil { - retryTy = retryWrite - } - return retryTy, newRegion, errors.Annotatef(berrors.ErrKVEpochNotMatch, "epoch not match: %s", errPb.GetMessage()) - case strings.Contains(errPb.Message, "raft: proposal dropped"): - // TODO: we should change 'Raft raft: proposal dropped' to a error type like 'NotLeader' - newRegion, err = getRegion() - if err != nil { - return retryNone, nil, errors.Trace(err) - } - return retryIngest, newRegion, errors.Annotate(berrors.ErrKVUnknown, errPb.GetMessage()) - } - return retryNone, nil, errors.Annotatef(berrors.ErrKVUnknown, "non-retryable error: %s", resp.GetError().GetMessage()) -} diff --git a/br/pkg/restore/log_client.go b/br/pkg/restore/log_client.go deleted file mode 100644 index fd71a06903ad1..0000000000000 --- a/br/pkg/restore/log_client.go +++ /dev/null @@ -1,761 +0,0 @@ -// Copyright 2020 PingCAP, Inc. Licensed under Apache-2.0. - -package restore - -import ( - "bytes" - "context" - "encoding/json" - "fmt" - "path/filepath" - "sort" - "strconv" - "strings" - "sync" - "time" - - "github.com/pingcap/errors" - "github.com/pingcap/log" - filter "github.com/pingcap/tidb-tools/pkg/table-filter" - "github.com/pingcap/tidb/br/pkg/cdclog" - berrors "github.com/pingcap/tidb/br/pkg/errors" - "github.com/pingcap/tidb/br/pkg/kv" - "github.com/pingcap/tidb/br/pkg/storage" - "github.com/pingcap/tidb/br/pkg/utils" - "github.com/pingcap/tidb/domain" - "github.com/pingcap/tidb/meta/autoid" - "github.com/pingcap/tidb/parser/model" - titable "github.com/pingcap/tidb/table" - "github.com/tikv/client-go/v2/oracle" - "go.uber.org/zap" - "golang.org/x/sync/errgroup" -) - -const ( - tableLogPrefix = "t_" - logPrefix = "cdclog" - - metaFile = "log.meta" - ddlEventsDir = "ddls" - ddlFilePrefix = "ddl" - - maxUint64 = ^uint64(0) - - maxRetryTimes = 3 -) - -// concurrencyCfg set by user, which can adjust the restore performance. -type concurrencyCfg struct { - BatchWriteKVPairs int - BatchFlushKVPairs int - BatchFlushKVSize int64 - Concurrency uint - TCPConcurrency int - IngestConcurrency uint -} - -// LogMeta represents the log.meta generated by cdc log backup. -type LogMeta struct { - Names map[int64]string `json:"names"` - GlobalResolvedTS uint64 `json:"global_resolved_ts"` -} - -// LogClient sends requests to restore files. -type LogClient struct { - // lock DDL execution - // TODO remove lock by using db session pool if necessary - ddlLock sync.Mutex - - restoreClient *Client - splitClient SplitClient - importerClient ImporterClient - - // ingester is used to write and ingest kvs to tikv. - // lightning has the simlar logic and can reuse it. - ingester *Ingester - - // range of log backup - startTS uint64 - endTS uint64 - - concurrencyCfg concurrencyCfg - // meta info parsed from log backup - meta *LogMeta - eventPullers map[int64]*cdclog.EventPuller - tableBuffers map[int64]*cdclog.TableBuffer - - tableFilter filter.Filter - - // a map to store all drop schema ts, use it as a filter - dropTSMap sync.Map -} - -// NewLogRestoreClient returns a new LogRestoreClient. -func NewLogRestoreClient( - ctx context.Context, - restoreClient *Client, - startTS uint64, - endTS uint64, - tableFilter filter.Filter, - concurrency uint, - batchFlushPairs int, - batchFlushSize int64, - batchWriteKVPairs int, -) (*LogClient, error) { - var err error - if endTS == 0 { - // means restore all log data, - // so we get current ts from restore cluster - endTS, err = restoreClient.GetTS(ctx) - if err != nil { - return nil, errors.Trace(err) - } - } - - tlsConf := restoreClient.GetTLSConfig() - splitClient := NewSplitClient(restoreClient.GetPDClient(), tlsConf) - importClient := NewImportClient(splitClient, tlsConf, restoreClient.keepaliveConf) - - cfg := concurrencyCfg{ - Concurrency: concurrency, - BatchFlushKVPairs: batchFlushPairs, - BatchFlushKVSize: batchFlushSize, - BatchWriteKVPairs: batchWriteKVPairs, - IngestConcurrency: concurrency * 16, - TCPConcurrency: int(concurrency) * 16, - } - - // commitTS append into encode key. we use a unified ts for once log restore. - commitTS := oracle.ComposeTS(time.Now().Unix()*1000, 0) - lc := &LogClient{ - restoreClient: restoreClient, - splitClient: splitClient, - importerClient: importClient, - startTS: startTS, - endTS: endTS, - concurrencyCfg: cfg, - meta: new(LogMeta), - eventPullers: make(map[int64]*cdclog.EventPuller), - tableBuffers: make(map[int64]*cdclog.TableBuffer), - tableFilter: tableFilter, - ingester: NewIngester(splitClient, cfg, commitTS, tlsConf), - } - return lc, nil -} - -// ResetTSRange used for test. -func (l *LogClient) ResetTSRange(startTS uint64, endTS uint64) { - l.startTS = startTS - l.endTS = endTS -} - -func (l *LogClient) maybeTSInRange(ts uint64) bool { - // We choose the last event's ts as file name in cdclog when rotate. - // so even this file name's ts is larger than l.endTS, - // we still need to collect it, because it may have some events in this ts range. - // TODO: find another effective filter to collect files - return ts >= l.startTS -} - -func (l *LogClient) tsInRange(ts uint64) bool { - return l.startTS <= ts && ts <= l.endTS -} - -func (l *LogClient) shouldFilter(item *cdclog.SortItem) bool { - if val, ok := l.dropTSMap.Load(item.Schema); ok { - if val.(uint64) > item.TS { - return true - } - } - return false -} - -// NeedRestoreDDL determines whether to collect ddl file by ts range. -func (l *LogClient) NeedRestoreDDL(fileName string) (bool, error) { - names := strings.Split(fileName, ".") - if len(names) != 2 { - log.Warn("found wrong format of ddl file", zap.String("file", fileName)) - return false, nil - } - if names[0] != ddlFilePrefix { - log.Warn("file doesn't start with ddl", zap.String("file", fileName)) - return false, nil - } - ts, err := strconv.ParseUint(names[1], 10, 64) - if err != nil { - return false, errors.Trace(err) - } - - // According to https://docs.aws.amazon.com/AmazonS3/latest/dev/ListingKeysUsingAPIs.html - // list API return in UTF-8 binary order, so the cdc log create DDL file used - // maxUint64 - the first DDL event's commit ts as the file name to return the latest ddl file. - // see details at https://github.com/pingcap/ticdc/pull/826/files#diff-d2e98b3ed211b7b9bb7b6da63dd48758R81 - ts = maxUint64 - ts - - // In cdc, we choose the first event as the file name of DDL file. - // so if the file ts is large than endTS, we can skip to execute it. - // FIXME find a unified logic to filter row changes files and ddl files. - if ts <= l.endTS { - return true, nil - } - log.Info("filter ddl file by ts", zap.String("name", fileName), zap.Uint64("ts", ts)) - return false, nil -} - -func (l *LogClient) collectDDLFiles(ctx context.Context) ([]string, error) { - ddlFiles := make([]string, 0) - opt := &storage.WalkOption{ - SubDir: ddlEventsDir, - ListCount: -1, - } - err := l.restoreClient.storage.WalkDir(ctx, opt, func(path string, size int64) error { - fileName := filepath.Base(path) - shouldRestore, err := l.NeedRestoreDDL(fileName) - if err != nil { - return errors.Trace(err) - } - if shouldRestore { - ddlFiles = append(ddlFiles, path) - } - return nil - }) - if err != nil { - return nil, errors.Trace(err) - } - - sort.Sort(sort.Reverse(sort.StringSlice(ddlFiles))) - return ddlFiles, nil -} - -func (l *LogClient) isDBRelatedDDL(ddl *cdclog.MessageDDL) bool { - switch ddl.Type { - case model.ActionDropSchema, model.ActionCreateSchema, model.ActionModifySchemaCharsetAndCollate, model.ActionModifySchemaDefaultPlacement: - return true - } - return false -} - -func (l *LogClient) isDropTable(ddl *cdclog.MessageDDL) bool { - return ddl.Type == model.ActionDropTable -} - -func (l *LogClient) doDBDDLJob(ctx context.Context, ddls []string) error { - if len(ddls) == 0 { - log.Info("no ddls to restore") - return nil - } - - for _, path := range ddls { - data, err := l.restoreClient.storage.ReadFile(ctx, path) - if err != nil { - return errors.Trace(err) - } - eventDecoder, err := cdclog.NewJSONEventBatchDecoder(data) - if err != nil { - return errors.Trace(err) - } - for eventDecoder.HasNext() { - item, err := eventDecoder.NextEvent(cdclog.DDL) - if err != nil { - return errors.Trace(err) - } - ddl := item.Data.(*cdclog.MessageDDL) - log.Debug("[doDBDDLJob] parse ddl", zap.String("query", ddl.Query)) - if l.isDBRelatedDDL(ddl) && l.tsInRange(item.TS) { - err = l.restoreClient.db.se.Execute(ctx, ddl.Query) - if err != nil { - log.Error("[doDBDDLJob] exec ddl failed", - zap.String("query", ddl.Query), zap.Error(err)) - return errors.Trace(err) - } - if ddl.Type == model.ActionDropSchema { - // store the drop schema ts, and then we need filter evetns which ts is small than this. - l.dropTSMap.Store(item.Schema, item.TS) - } - } - } - } - return nil -} - -// NeedRestoreRowChange determine whether to collect this file by ts range. -func (l *LogClient) NeedRestoreRowChange(fileName string) (bool, error) { - if fileName == logPrefix { - // this file name appeared when file sink enabled - return true, nil - } - names := strings.Split(fileName, ".") - if len(names) != 2 { - log.Warn("found wrong format of row changes file", zap.String("file", fileName)) - return false, nil - } - if names[0] != logPrefix { - log.Warn("file doesn't start with row changes file", zap.String("file", fileName)) - return false, nil - } - ts, err := strconv.ParseUint(names[1], 10, 64) - if err != nil { - return false, errors.Trace(err) - } - if l.maybeTSInRange(ts) { - return true, nil - } - log.Info("filter file by ts", zap.String("name", fileName), zap.Uint64("ts", ts)) - return false, nil -} - -func (l *LogClient) collectRowChangeFiles(ctx context.Context) (map[int64][]string, error) { - // we should collect all related tables row change files - // by log meta info and by given table filter - rowChangeFiles := make(map[int64][]string) - - // need collect restore tableIDs - tableIDs := make([]int64, 0, len(l.meta.Names)) - - // we need remove duplicate table name in collection. - // when a table create and drop and create again. - // then we will have two different table id with same tables. - // we should keep the latest table id(larger table id), and filter the old one. - nameIDMap := make(map[string]int64) - for tableID, name := range l.meta.Names { - if tid, ok := nameIDMap[name]; ok { - if tid < tableID { - nameIDMap[name] = tableID - } - } else { - nameIDMap[name] = tableID - } - } - for name, tableID := range nameIDMap { - schema, table := ParseQuoteName(name) - if !l.tableFilter.MatchTable(schema, table) { - log.Info("filter tables", zap.String("schema", schema), - zap.String("table", table), zap.Int64("tableID", tableID)) - continue - } - tableIDs = append(tableIDs, tableID) - } - - for _, tID := range tableIDs { - tableID := tID - // FIXME update log meta logic here - dir := fmt.Sprintf("%s%d", tableLogPrefix, tableID) - opt := &storage.WalkOption{ - SubDir: dir, - ListCount: -1, - } - err := l.restoreClient.storage.WalkDir(ctx, opt, func(path string, size int64) error { - fileName := filepath.Base(path) - shouldRestore, err := l.NeedRestoreRowChange(fileName) - if err != nil { - return errors.Trace(err) - } - if shouldRestore { - rowChangeFiles[tableID] = append(rowChangeFiles[tableID], path) - } - return nil - }) - if err != nil { - return nil, errors.Trace(err) - } - } - - // sort file in order - for tID, files := range rowChangeFiles { - sortFiles := files - sort.Slice(sortFiles, func(i, j int) bool { - if filepath.Base(sortFiles[j]) == logPrefix { - return true - } - return sortFiles[i] < sortFiles[j] - }) - rowChangeFiles[tID] = sortFiles - } - - return rowChangeFiles, nil -} - -func (l *LogClient) writeRows(ctx context.Context, kvs kv.Pairs) error { - log.Info("writeRows", zap.Int("kv count", len(kvs))) - if len(kvs) == 0 { - // shouldn't happen - log.Warn("not rows to write") - return nil - } - - // stable sort kvs in memory - sort.SliceStable(kvs, func(i, j int) bool { - return bytes.Compare(kvs[i].Key, kvs[j].Key) < 0 - }) - - // remove duplicate keys, and keep the last one - newKvs := make([]kv.Pair, 0, len(kvs)) - for i := 0; i < len(kvs); i++ { - if i == len(kvs)-1 { - newKvs = append(newKvs, kvs[i]) - break - } - if bytes.Equal(kvs[i].Key, kvs[i+1].Key) { - // skip this one - continue - } - newKvs = append(newKvs, kvs[i]) - } - - remainRange := newSyncdRanges() - remainRange.add(Range{ - Start: newKvs[0].Key, - End: kv.NextKey(newKvs[len(newKvs)-1].Key), - }) - iterProducer := kv.NewSimpleKVIterProducer(newKvs) - for { - remain := remainRange.take() - if len(remain) == 0 { - log.Info("writeRows finish") - break - } - eg, ectx := errgroup.WithContext(ctx) - for _, r := range remain { - rangeReplica := r - l.ingester.WorkerPool.ApplyOnErrorGroup(eg, func() error { - err := l.ingester.writeAndIngestByRange(ectx, iterProducer, rangeReplica.Start, rangeReplica.End, remainRange) - if err != nil { - log.Warn("writeRows failed with range", zap.Any("range", rangeReplica), zap.Error(err)) - return errors.Trace(err) - } - return nil - }) - } - if err := eg.Wait(); err != nil { - return errors.Trace(err) - } - log.Info("writeRows ranges unfinished, retry it", zap.Int("remain ranges", len(remain))) - } - return nil -} - -func (l *LogClient) reloadTableMeta(dom *domain.Domain, tableID int64, item *cdclog.SortItem) error { - err := dom.Reload() - if err != nil { - return errors.Trace(err) - } - // find tableID for this table on cluster - newTableID := l.tableBuffers[tableID].TableID() - var ( - newTableInfo titable.Table - ok bool - ) - if newTableID != 0 { - newTableInfo, ok = dom.InfoSchema().TableByID(newTableID) - if !ok { - log.Error("[restoreFromPuller] can't get table info from dom by tableID", - zap.Int64("backup table id", tableID), - zap.Int64("restore table id", newTableID), - ) - return errors.Trace(err) - } - } else { - // fall back to use schema table get info - newTableInfo, err = dom.InfoSchema().TableByName( - model.NewCIStr(item.Schema), model.NewCIStr(item.Table)) - if err != nil { - log.Error("[restoreFromPuller] can't get table info from dom by table name", - zap.Int64("backup table id", tableID), - zap.Int64("restore table id", newTableID), - zap.String("restore table name", item.Table), - zap.String("restore schema name", item.Schema), - ) - return errors.Trace(err) - } - } - - dbInfo, ok := dom.InfoSchema().SchemaByName(model.NewCIStr(item.Schema)) - if !ok { - return errors.Annotatef(berrors.ErrRestoreSchemaNotExists, "schema %s", item.Schema) - } - allocs := autoid.NewAllocatorsFromTblInfo(dom.Store(), dbInfo.ID, newTableInfo.Meta()) - - // reload - l.tableBuffers[tableID].ReloadMeta(newTableInfo, allocs) - log.Debug("reload table meta for table", - zap.Int64("backup table id", tableID), - zap.Int64("restore table id", newTableID), - zap.String("restore table name", item.Table), - zap.String("restore schema name", item.Schema), - zap.Any("allocator", len(allocs)), - zap.Any("auto", newTableInfo.Meta().GetAutoIncrementColInfo()), - ) - return nil -} - -func (l *LogClient) applyKVChanges(ctx context.Context, tableID int64) error { - log.Info("apply kv changes to tikv", - zap.Any("table", tableID), - ) - dataKVs := kv.Pairs{} - indexKVs := kv.Pairs{} - - tableBuffer := l.tableBuffers[tableID] - if tableBuffer.IsEmpty() { - log.Warn("no kv changes to apply") - return nil - } - - var dataChecksum, indexChecksum kv.Checksum - for _, p := range tableBuffer.KvPairs { - p.ClassifyAndAppend(&dataKVs, &dataChecksum, &indexKVs, &indexChecksum) - } - - err := l.writeRows(ctx, dataKVs) - if err != nil { - return errors.Trace(err) - } - dataKVs = dataKVs.Clear() - - err = l.writeRows(ctx, indexKVs) - if err != nil { - return errors.Trace(err) - } - indexKVs = indexKVs.Clear() - - tableBuffer.Clear() - - return nil -} - -func (l *LogClient) restoreTableFromPuller( - ctx context.Context, - tableID int64, - puller *cdclog.EventPuller, - dom *domain.Domain) error { - for { - item, err := puller.PullOneEvent(ctx) - if err != nil { - return errors.Trace(err) - } - if item == nil { - log.Info("[restoreFromPuller] nothing in this puller, we should stop and flush", - zap.Int64("table id", tableID)) - err = l.applyKVChanges(ctx, tableID) - if err != nil { - return errors.Trace(err) - } - return nil - } - log.Debug("[restoreFromPuller] next event", zap.Any("item", item), zap.Int64("table id", tableID)) - if l.startTS > item.TS { - log.Debug("[restoreFromPuller] item ts is smaller than start ts, skip this item", - zap.Uint64("start ts", l.startTS), - zap.Uint64("end ts", l.endTS), - zap.Uint64("item ts", item.TS), - zap.Int64("table id", tableID)) - continue - } - if l.endTS < item.TS { - log.Warn("[restoreFromPuller] ts is larger than end ts, we should stop and flush", - zap.Uint64("start ts", l.startTS), - zap.Uint64("end ts", l.endTS), - zap.Uint64("item ts", item.TS), - zap.Int64("table id", tableID)) - err = l.applyKVChanges(ctx, tableID) - if err != nil { - return errors.Trace(err) - } - return nil - } - - if l.shouldFilter(item) { - log.Debug("[restoreFromPuller] filter item because later drop schema will affect on this item", - zap.Any("item", item), - zap.Int64("table id", tableID)) - err = l.applyKVChanges(ctx, tableID) - if err != nil { - return errors.Trace(err) - } - continue - } - - switch item.ItemType { - case cdclog.DDL: - name := l.meta.Names[tableID] - schema, table := ParseQuoteName(name) - ddl := item.Data.(*cdclog.MessageDDL) - // ddl not influence on this schema/table - if !(schema == item.Schema && (table == item.Table || l.isDBRelatedDDL(ddl))) { - log.Info("[restoreFromPuller] meet unrelated ddl, and continue pulling", - zap.String("item table", item.Table), - zap.String("table", table), - zap.String("item schema", item.Schema), - zap.String("schema", schema), - zap.Int64("backup table id", tableID), - zap.String("query", ddl.Query), - zap.Int64("table id", tableID)) - continue - } - - // database level ddl job has been executed at the beginning - if l.isDBRelatedDDL(ddl) { - log.Debug("[restoreFromPuller] meet database level ddl, continue pulling", - zap.String("ddl", ddl.Query), - zap.Int64("table id", tableID)) - continue - } - - // wait all previous kvs ingest finished - err = l.applyKVChanges(ctx, tableID) - if err != nil { - return errors.Trace(err) - } - - log.Debug("[restoreFromPuller] execute ddl", zap.String("ddl", ddl.Query)) - - l.ddlLock.Lock() - err = l.restoreClient.db.se.Execute(ctx, fmt.Sprintf("use %s", item.Schema)) - if err != nil { - return errors.Trace(err) - } - - err = l.restoreClient.db.se.Execute(ctx, ddl.Query) - if err != nil { - return errors.Trace(err) - } - l.ddlLock.Unlock() - - // if table dropped, we will pull next event to see if this table will create again. - // with next create table ddl, we can do reloadTableMeta. - if l.isDropTable(ddl) { - log.Info("[restoreFromPuller] skip reload because this is a drop table ddl", - zap.String("ddl", ddl.Query)) - l.tableBuffers[tableID].ResetTableInfo() - continue - } - err = l.reloadTableMeta(dom, tableID, item) - if err != nil { - return errors.Trace(err) - } - case cdclog.RowChanged: - if l.tableBuffers[tableID].TableInfo() == nil { - err = l.reloadTableMeta(dom, tableID, item) - if err != nil { - // shouldn't happen - return errors.Trace(err) - } - } - err = l.tableBuffers[tableID].Append(item) - if err != nil { - return errors.Trace(err) - } - if l.tableBuffers[tableID].ShouldApply() { - err = l.applyKVChanges(ctx, tableID) - if err != nil { - return errors.Trace(err) - } - } - } - } -} - -func (l *LogClient) restoreTables(ctx context.Context, dom *domain.Domain) error { - // 1. decode cdclog with in ts range - // 2. dispatch cdclog events to table level concurrently - // a. encode row changed files to kvpairs and ingest into tikv - // b. exec ddl - log.Debug("start restore tables") - workerPool := utils.NewWorkerPool(l.concurrencyCfg.Concurrency, "table log restore") - eg, ectx := errgroup.WithContext(ctx) - for tableID, puller := range l.eventPullers { - pullerReplica := puller - tableIDReplica := tableID - workerPool.ApplyOnErrorGroup(eg, func() error { - return l.restoreTableFromPuller(ectx, tableIDReplica, pullerReplica, dom) - }) - } - return eg.Wait() -} - -// RestoreLogData restore specify log data from storage. -func (l *LogClient) RestoreLogData(ctx context.Context, dom *domain.Domain) error { - // 1. Retrieve log data from storage - // 2. Find proper data by TS range - // 3. Encode and ingest data to tikv - - // parse meta file - data, err := l.restoreClient.storage.ReadFile(ctx, metaFile) - if err != nil { - return errors.Trace(err) - } - err = json.Unmarshal(data, l.meta) - if err != nil { - return errors.Trace(err) - } - log.Info("get meta from storage", zap.Binary("data", data)) - - if l.startTS > l.meta.GlobalResolvedTS { - return errors.Annotatef(berrors.ErrRestoreRTsConstrain, - "start ts:%d is greater than resolved ts:%d", l.startTS, l.meta.GlobalResolvedTS) - } - if l.endTS > l.meta.GlobalResolvedTS { - log.Info("end ts is greater than resolved ts,"+ - " to keep consistency we only recover data until resolved ts", - zap.Uint64("end ts", l.endTS), - zap.Uint64("resolved ts", l.meta.GlobalResolvedTS)) - l.endTS = l.meta.GlobalResolvedTS - } - - // collect ddl files - ddlFiles, err := l.collectDDLFiles(ctx) - if err != nil { - return errors.Trace(err) - } - - log.Info("collect ddl files", zap.Any("files", ddlFiles)) - - err = l.doDBDDLJob(ctx, ddlFiles) - if err != nil { - return errors.Trace(err) - } - log.Debug("db level ddl executed") - - // collect row change files - rowChangesFiles, err := l.collectRowChangeFiles(ctx) - if err != nil { - return errors.Trace(err) - } - - log.Info("collect row changed files", zap.Any("files", rowChangesFiles)) - - // create event puller to apply changes concurrently - for tableID, files := range rowChangesFiles { - name := l.meta.Names[tableID] - schema, table := ParseQuoteName(name) - log.Info("create puller for table", - zap.Int64("table id", tableID), - zap.String("schema", schema), - zap.String("table", table), - ) - l.eventPullers[tableID], err = cdclog.NewEventPuller(ctx, schema, table, ddlFiles, files, l.restoreClient.storage) - if err != nil { - return errors.Trace(err) - } - // use table name to get table info - var tableInfo titable.Table - var allocs autoid.Allocators - infoSchema := dom.InfoSchema() - if infoSchema.TableExists(model.NewCIStr(schema), model.NewCIStr(table)) { - tableInfo, err = infoSchema.TableByName(model.NewCIStr(schema), model.NewCIStr(table)) - if err != nil { - return errors.Trace(err) - } - dbInfo, ok := dom.InfoSchema().SchemaByName(model.NewCIStr(schema)) - if !ok { - return errors.Annotatef(berrors.ErrRestoreSchemaNotExists, "schema %s", schema) - } - allocs = autoid.NewAllocatorsFromTblInfo(dom.Store(), dbInfo.ID, tableInfo.Meta()) - } - - l.tableBuffers[tableID] = cdclog.NewTableBuffer(tableInfo, allocs, - l.concurrencyCfg.BatchFlushKVPairs, l.concurrencyCfg.BatchFlushKVSize) - } - // restore files - return l.restoreTables(ctx, dom) -} diff --git a/br/pkg/restore/log_client_test.go b/br/pkg/restore/log_client_test.go deleted file mode 100644 index 24f808faaf05b..0000000000000 --- a/br/pkg/restore/log_client_test.go +++ /dev/null @@ -1,126 +0,0 @@ -// Copyright 2020 PingCAP, Inc. Licensed under Apache-2.0. - -package restore_test - -import ( - "context" - "math" - - . "github.com/pingcap/check" - filter "github.com/pingcap/tidb-tools/pkg/table-filter" - "github.com/pingcap/tidb/br/pkg/gluetidb" - "github.com/pingcap/tidb/br/pkg/mock" - "github.com/pingcap/tidb/br/pkg/restore" - "github.com/pingcap/tidb/util/testleak" -) - -type testLogRestoreSuite struct { - mock *mock.Cluster - - client *restore.LogClient -} - -var _ = Suite(&testLogRestoreSuite{}) - -func (s *testLogRestoreSuite) SetUpSuite(c *C) { - var err error - s.mock, err = mock.NewCluster() - c.Assert(err, IsNil) - restoreClient, err := restore.NewRestoreClient( - gluetidb.New(), s.mock.PDClient, s.mock.Storage, nil, defaultKeepaliveCfg) - c.Assert(err, IsNil) - - s.client, err = restore.NewLogRestoreClient( - context.Background(), - restoreClient, - 0, - math.MaxInt64, - filter.NewSchemasFilter("test"), - 8, - 16, - 5<<20, - 16, - ) - c.Assert(err, IsNil) -} - -func (s *testLogRestoreSuite) TearDownSuite(c *C) { - testleak.AfterTest(c)() -} - -func (s *testLogRestoreSuite) TestTsInRange(c *C) { - fileName1 := "cdclog.1" - s.client.ResetTSRange(1, 2) - collected, err := s.client.NeedRestoreRowChange(fileName1) - c.Assert(err, IsNil) - c.Assert(collected, IsTrue) - - // cdclog.3 may have events in [1, 2] - // so we should collect it. - fileName2 := "cdclog.3" - s.client.ResetTSRange(1, 2) - collected, err = s.client.NeedRestoreRowChange(fileName2) - c.Assert(err, IsNil) - c.Assert(collected, IsTrue) - - fileName3 := "cdclog.3" - s.client.ResetTSRange(4, 5) - collected, err = s.client.NeedRestoreRowChange(fileName3) - c.Assert(err, IsNil) - c.Assert(collected, IsFalse) - - // format cdclog will collect, because file sink will generate cdclog for streaming write. - fileName4 := "cdclog" - collected, err = s.client.NeedRestoreRowChange(fileName4) - c.Assert(err, IsNil) - c.Assert(collected, IsTrue) - - for _, fileName := range []string{"cdclog.3.1", "cdclo.3"} { - // wrong format won't collect - collected, err = s.client.NeedRestoreRowChange(fileName) - c.Assert(err, IsNil) - c.Assert(collected, IsFalse) - } - - // format cdclog will collect, because file sink will generate cdclog for streaming write. - ddlFile := "ddl.18446744073709551615" - collected, err = s.client.NeedRestoreDDL(ddlFile) - c.Assert(err, IsNil) - c.Assert(collected, IsTrue) - - for _, fileName := range []string{"ddl", "dld.1"} { - // wrong format won't collect - collected, err = s.client.NeedRestoreDDL(fileName) - c.Assert(err, IsNil) - c.Assert(collected, IsFalse) - } - - s.client.ResetTSRange(424839867765096449, 424839886560821249) - // ddl suffix records the first event's commit ts - - // the file name include the end ts, collect it.(maxUint64 - 424839886560821249) - ddlFile = "ddl.18021904187148730366" - collected, err = s.client.NeedRestoreDDL(ddlFile) - c.Assert(err, IsNil) - c.Assert(collected, IsTrue) - - // the file name include the start ts, collect it.(maxUint64 - 424839867765096449) - ddlFile = "ddl.18021904205944455166" - collected, err = s.client.NeedRestoreDDL(ddlFile) - c.Assert(err, IsNil) - c.Assert(collected, IsTrue) - - // the file first event's ts is smaller than the start ts, collect it. - // because we only know this file's first event not in TSRange. - // FIXME find a unified logic for collection. - ddlFile = "ddl.18021904205944455167" - collected, err = s.client.NeedRestoreDDL(ddlFile) - c.Assert(err, IsNil) - c.Assert(collected, IsTrue) - - // the file first event's ts is large than end ts, skip it. - ddlFile = "ddl.18021904187148730365" - collected, err = s.client.NeedRestoreDDL(ddlFile) - c.Assert(err, IsNil) - c.Assert(collected, IsFalse) -} diff --git a/br/pkg/task/restore_log.go b/br/pkg/task/restore_log.go deleted file mode 100644 index 26a8bdae0add0..0000000000000 --- a/br/pkg/task/restore_log.go +++ /dev/null @@ -1,143 +0,0 @@ -// Copyright 2020 PingCAP, Inc. Licensed under Apache-2.0. - -package task - -import ( - "context" - - "github.com/pingcap/errors" - "github.com/pingcap/tidb/br/pkg/glue" - "github.com/pingcap/tidb/br/pkg/restore" - "github.com/pingcap/tidb/br/pkg/storage" - "github.com/spf13/cobra" - "github.com/spf13/pflag" -) - -const ( - flagStartTS = "start-ts" - flagEndTS = "end-ts" - flagBatchWriteCount = "write-kvs" - flagBatchFlushCount = "flush-kvs" - - // represents kv flush to storage for each table. - defaultFlushKV = 5120 - // represents kv size flush to storage for each table. - defaultFlushKVSize = 5 << 20 - // represents kv that write to TiKV once at at time. - defaultWriteKV = 1280 -) - -// LogRestoreConfig is the configuration specific for restore tasks. -type LogRestoreConfig struct { - Config - - StartTS uint64 - EndTS uint64 - - BatchFlushKVPairs int - BatchFlushKVSize int64 - BatchWriteKVPairs int -} - -// DefineLogRestoreFlags defines common flags for the backup command. -func DefineLogRestoreFlags(command *cobra.Command) { - command.Flags().Uint64P(flagStartTS, "", 0, "restore log start ts") - command.Flags().Uint64P(flagEndTS, "", 0, "restore log end ts") - - command.Flags().Uint64P(flagBatchWriteCount, "", 0, "the kv count that write to TiKV once at a time") - command.Flags().Uint64P(flagBatchFlushCount, "", 0, "the kv count that flush from memory to TiKV") -} - -// ParseFromFlags parses the restore-related flags from the flag set. -func (cfg *LogRestoreConfig) ParseFromFlags(flags *pflag.FlagSet) error { - var err error - cfg.StartTS, err = flags.GetUint64(flagStartTS) - if err != nil { - return errors.Trace(err) - } - cfg.EndTS, err = flags.GetUint64(flagEndTS) - if err != nil { - return errors.Trace(err) - } - err = cfg.Config.ParseFromFlags(flags) - if err != nil { - return errors.Trace(err) - } - return nil -} - -// adjustRestoreConfig is use for BR(binary) and BR in TiDB. -// When new config was add and not included in parser. -// we should set proper value in this function. -// so that both binary and TiDB will use same default value. -func (cfg *LogRestoreConfig) adjustRestoreConfig() { - cfg.adjust() - - if cfg.Config.Concurrency == 0 { - cfg.Config.Concurrency = defaultRestoreConcurrency - } - if cfg.BatchFlushKVPairs == 0 { - cfg.BatchFlushKVPairs = defaultFlushKV - } - if cfg.BatchWriteKVPairs == 0 { - cfg.BatchWriteKVPairs = defaultWriteKV - } - if cfg.BatchFlushKVSize == 0 { - cfg.BatchFlushKVSize = defaultFlushKVSize - } - // write kv count doesn't have to excceed flush kv count. - if cfg.BatchWriteKVPairs > cfg.BatchFlushKVPairs { - cfg.BatchWriteKVPairs = cfg.BatchFlushKVPairs - } -} - -// RunLogRestore starts a restore task inside the current goroutine. -func RunLogRestore(c context.Context, g glue.Glue, cfg *LogRestoreConfig) error { - cfg.adjustRestoreConfig() - - ctx, cancel := context.WithCancel(c) - defer cancel() - - // Restore needs domain to do DDL. - needDomain := true - mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, GetKeepalive(&cfg.Config), cfg.CheckRequirements, needDomain) - if err != nil { - return errors.Trace(err) - } - defer mgr.Close() - - u, err := storage.ParseBackend(cfg.Storage, &cfg.BackendOptions) - if err != nil { - return errors.Trace(err) - } - keepaliveCfg := GetKeepalive(&cfg.Config) - keepaliveCfg.PermitWithoutStream = true - client, err := restore.NewRestoreClient(g, mgr.GetPDClient(), mgr.GetStorage(), mgr.GetTLSConfig(), keepaliveCfg) - if err != nil { - return errors.Trace(err) - } - defer client.Close() - - opts := storage.ExternalStorageOptions{ - NoCredentials: cfg.NoCreds, - SendCredentials: cfg.SendCreds, - SkipCheckPath: cfg.SkipCheckPath, - } - if err = client.SetStorage(ctx, u, &opts); err != nil { - return errors.Trace(err) - } - - err = client.LoadRestoreStores(ctx) - if err != nil { - return errors.Trace(err) - } - - logClient, err := restore.NewLogRestoreClient( - ctx, client, cfg.StartTS, cfg.EndTS, cfg.TableFilter, uint(cfg.Concurrency), - cfg.BatchFlushKVPairs, cfg.BatchFlushKVSize, cfg.BatchWriteKVPairs) - if err != nil { - return errors.Trace(err) - } - - return logClient.RestoreLogData(ctx, mgr.GetDomain()) -}