Skip to content

Commit

Permalink
lightning: fix forget to set lastRetryableErr when ingest RPC fail (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
lance6716 committed Sep 26, 2024
1 parent b473b4b commit e02c06b
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 10 deletions.
1 change: 1 addition & 0 deletions br/pkg/lightning/backend/local/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ go_library(
"//pkg/util/compress",
"//pkg/util/engine",
"//pkg/util/hack",
"//pkg/util/intest",
"//pkg/util/mathutil",
"//pkg/util/ranger",
"@com_github_cockroachdb_pebble//:pebble",
Expand Down
23 changes: 18 additions & 5 deletions br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ import (
"github.com/pingcap/tidb/pkg/tablecodec"
"github.com/pingcap/tidb/pkg/util/codec"
"github.com/pingcap/tidb/pkg/util/engine"
"github.com/pingcap/tidb/pkg/util/intest"
"github.com/pingcap/tidb/pkg/util/mathutil"
"github.com/tikv/client-go/v2/oracle"
tikvclient "github.com/tikv/client-go/v2/tikv"
Expand All @@ -81,9 +82,6 @@ const (
dialTimeout = 5 * time.Minute
maxRetryTimes = 5
defaultRetryBackoffTime = 3 * time.Second
// maxWriteAndIngestRetryTimes is the max retry times for write and ingest.
// A large retry times is for tolerating tikv cluster failures.
maxWriteAndIngestRetryTimes = 30

gRPCKeepAliveTime = 10 * time.Minute
gRPCKeepAliveTimeout = 5 * time.Minute
Expand Down Expand Up @@ -116,6 +114,10 @@ var (

errorEngineClosed = errors.New("engine is closed")
maxRetryBackoffSecond = 30

// MaxWriteAndIngestRetryTimes is the max retry times for write and ingest.
// A large retry times is for tolerating tikv cluster failures.
MaxWriteAndIngestRetryTimes = 30
)

// ImportClientFactory is factory to create new import client for specific store.
Expand Down Expand Up @@ -1701,8 +1703,19 @@ func (local *Backend) doImport(ctx context.Context, engine common.Engine, region
switch job.stage {
case regionScanned, wrote:
job.retryCount++
if job.retryCount > maxWriteAndIngestRetryTimes {
firstErr.Set(job.lastRetryableErr)
if job.retryCount > MaxWriteAndIngestRetryTimes {
lastErr := job.lastRetryableErr
intest.Assert(lastErr != nil, "lastRetryableErr should not be nil")
if lastErr == nil {
lastErr = errors.New("retry limit exceeded")
log.FromContext(ctx).Error(
"lastRetryableErr should not be nil",
logutil.Key("startKey", job.keyRange.Start),
logutil.Key("endKey", job.keyRange.End),
zap.Stringer("stage", job.stage),
zap.Error(lastErr))
}
firstErr.Set(lastErr)
workerCancel()
job.done(&jobWg)
continue
Expand Down
10 changes: 5 additions & 5 deletions br/pkg/lightning/backend/local/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2060,7 +2060,7 @@ func TestDoImport(t *testing.T) {
{
keyRange: common.Range{Start: []byte{'a'}, End: []byte{'b'}},
ingestData: &Engine{},
retryCount: maxWriteAndIngestRetryTimes - 1,
retryCount: MaxWriteAndIngestRetryTimes - 1,
injected: getSuccessInjectedBehaviour(),
},
},
Expand All @@ -2070,7 +2070,7 @@ func TestDoImport(t *testing.T) {
{
keyRange: common.Range{Start: []byte{'b'}, End: []byte{'c'}},
ingestData: &Engine{},
retryCount: maxWriteAndIngestRetryTimes - 1,
retryCount: MaxWriteAndIngestRetryTimes - 1,
injected: getSuccessInjectedBehaviour(),
},
},
Expand All @@ -2080,7 +2080,7 @@ func TestDoImport(t *testing.T) {
{
keyRange: common.Range{Start: []byte{'c'}, End: []byte{'d'}},
ingestData: &Engine{},
retryCount: maxWriteAndIngestRetryTimes - 2,
retryCount: MaxWriteAndIngestRetryTimes - 2,
injected: []injectedBehaviour{
{
write: injectedWriteBehaviour{
Expand Down Expand Up @@ -2131,13 +2131,13 @@ func TestRegionJobResetRetryCounter(t *testing.T) {
keyRange: common.Range{Start: []byte{'c'}, End: []byte{'c', '2'}},
ingestData: &Engine{},
injected: getNeedRescanWhenIngestBehaviour(),
retryCount: maxWriteAndIngestRetryTimes,
retryCount: MaxWriteAndIngestRetryTimes,
},
{
keyRange: common.Range{Start: []byte{'c', '2'}, End: []byte{'d'}},
ingestData: &Engine{},
injected: getSuccessInjectedBehaviour(),
retryCount: maxWriteAndIngestRetryTimes,
retryCount: MaxWriteAndIngestRetryTimes,
},
},
},
Expand Down
4 changes: 4 additions & 0 deletions br/pkg/lightning/backend/local/region_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,7 @@ func (local *Backend) ingest(ctx context.Context, j *regionJob) (err error) {
log.FromContext(ctx).Warn("meet underlying error, will retry ingest",
log.ShortError(err), logutil.SSTMetas(j.writeResult.sstMeta),
logutil.Region(j.region.Region), logutil.Leader(j.region.Leader))
j.lastRetryableErr = err
continue
}
canContinue, err := j.convertStageOnIngestError(resp)
Expand Down Expand Up @@ -575,6 +576,9 @@ func (local *Backend) checkWriteStall(
// doIngest send ingest commands to TiKV based on regionJob.writeResult.sstMeta.
// When meet error, it will remove finished sstMetas before return.
func (local *Backend) doIngest(ctx context.Context, j *regionJob) (*sst.IngestResponse, error) {
failpoint.Inject("doIngestFailed", func() {
failpoint.Return(nil, errors.New("injected error"))
})
clientFactory := local.importClientFactory
supportMultiIngest := local.supportMultiIngest
shouldCheckWriteStall := local.ShouldCheckWriteStall
Expand Down
28 changes: 28 additions & 0 deletions tests/realtikvtest/addindextest4/ingest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -580,3 +580,31 @@ func TestFirstLitSlowStart(t *testing.T) {
}()
wg.Wait()
}

func TestIssue55808(t *testing.T) {
store := realtikvtest.CreateMockStoreAndSetup(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("drop database if exists addindexlit;")
tk.MustExec("create database addindexlit;")
tk.MustExec("use addindexlit;")
tk.MustExec(`set global tidb_ddl_enable_fast_reorg=on;`)
tk.MustExec("set global tidb_enable_dist_task = off;")
tk.MustExec("set global tidb_ddl_error_count_limit = 0")

backup := local.MaxWriteAndIngestRetryTimes
local.MaxWriteAndIngestRetryTimes = 1
t.Cleanup(func() {
local.MaxWriteAndIngestRetryTimes = backup
})

tk.MustExec("create table t (a int primary key, b int);")

for i := 0; i < 4; i++ {
tk.MustExec(fmt.Sprintf("insert into t values (%d, %d);", i*10000, i*10000))
}

require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/doIngestFailed", "return()"))
err := tk.ExecToErr("alter table t add index idx(a);")
require.ErrorContains(t, err, "injected error")
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/doIngestFailed"))
}

0 comments on commit e02c06b

Please sign in to comment.