diff --git a/br/pkg/lightning/backend/local/BUILD.bazel b/br/pkg/lightning/backend/local/BUILD.bazel index ec1924d402cd1..8f525b743d07f 100644 --- a/br/pkg/lightning/backend/local/BUILD.bazel +++ b/br/pkg/lightning/backend/local/BUILD.bazel @@ -59,6 +59,7 @@ go_library( "//pkg/util/compress", "//pkg/util/engine", "//pkg/util/hack", + "//pkg/util/intest", "//pkg/util/mathutil", "//pkg/util/ranger", "@com_github_cockroachdb_pebble//:pebble", diff --git a/br/pkg/lightning/backend/local/local.go b/br/pkg/lightning/backend/local/local.go index 328476270c884..3153dbb7de6d2 100644 --- a/br/pkg/lightning/backend/local/local.go +++ b/br/pkg/lightning/backend/local/local.go @@ -61,6 +61,7 @@ import ( "github.com/pingcap/tidb/pkg/tablecodec" "github.com/pingcap/tidb/pkg/util/codec" "github.com/pingcap/tidb/pkg/util/engine" + "github.com/pingcap/tidb/pkg/util/intest" "github.com/pingcap/tidb/pkg/util/mathutil" "github.com/tikv/client-go/v2/oracle" tikvclient "github.com/tikv/client-go/v2/tikv" @@ -81,9 +82,6 @@ const ( dialTimeout = 5 * time.Minute maxRetryTimes = 5 defaultRetryBackoffTime = 3 * time.Second - // maxWriteAndIngestRetryTimes is the max retry times for write and ingest. - // A large retry times is for tolerating tikv cluster failures. - maxWriteAndIngestRetryTimes = 30 gRPCKeepAliveTime = 10 * time.Minute gRPCKeepAliveTimeout = 5 * time.Minute @@ -116,6 +114,10 @@ var ( errorEngineClosed = errors.New("engine is closed") maxRetryBackoffSecond = 30 + + // MaxWriteAndIngestRetryTimes is the max retry times for write and ingest. + // A large retry times is for tolerating tikv cluster failures. + MaxWriteAndIngestRetryTimes = 30 ) // ImportClientFactory is factory to create new import client for specific store. @@ -1701,8 +1703,19 @@ func (local *Backend) doImport(ctx context.Context, engine common.Engine, region switch job.stage { case regionScanned, wrote: job.retryCount++ - if job.retryCount > maxWriteAndIngestRetryTimes { - firstErr.Set(job.lastRetryableErr) + if job.retryCount > MaxWriteAndIngestRetryTimes { + lastErr := job.lastRetryableErr + intest.Assert(lastErr != nil, "lastRetryableErr should not be nil") + if lastErr == nil { + lastErr = errors.New("retry limit exceeded") + log.FromContext(ctx).Error( + "lastRetryableErr should not be nil", + logutil.Key("startKey", job.keyRange.Start), + logutil.Key("endKey", job.keyRange.End), + zap.Stringer("stage", job.stage), + zap.Error(lastErr)) + } + firstErr.Set(lastErr) workerCancel() job.done(&jobWg) continue diff --git a/br/pkg/lightning/backend/local/local_test.go b/br/pkg/lightning/backend/local/local_test.go index e9e639406138f..205a00a831c11 100644 --- a/br/pkg/lightning/backend/local/local_test.go +++ b/br/pkg/lightning/backend/local/local_test.go @@ -2060,7 +2060,7 @@ func TestDoImport(t *testing.T) { { keyRange: common.Range{Start: []byte{'a'}, End: []byte{'b'}}, ingestData: &Engine{}, - retryCount: maxWriteAndIngestRetryTimes - 1, + retryCount: MaxWriteAndIngestRetryTimes - 1, injected: getSuccessInjectedBehaviour(), }, }, @@ -2070,7 +2070,7 @@ func TestDoImport(t *testing.T) { { keyRange: common.Range{Start: []byte{'b'}, End: []byte{'c'}}, ingestData: &Engine{}, - retryCount: maxWriteAndIngestRetryTimes - 1, + retryCount: MaxWriteAndIngestRetryTimes - 1, injected: getSuccessInjectedBehaviour(), }, }, @@ -2080,7 +2080,7 @@ func TestDoImport(t *testing.T) { { keyRange: common.Range{Start: []byte{'c'}, End: []byte{'d'}}, ingestData: &Engine{}, - retryCount: maxWriteAndIngestRetryTimes - 2, + retryCount: MaxWriteAndIngestRetryTimes - 2, injected: []injectedBehaviour{ { write: injectedWriteBehaviour{ @@ -2131,13 +2131,13 @@ func TestRegionJobResetRetryCounter(t *testing.T) { keyRange: common.Range{Start: []byte{'c'}, End: []byte{'c', '2'}}, ingestData: &Engine{}, injected: getNeedRescanWhenIngestBehaviour(), - retryCount: maxWriteAndIngestRetryTimes, + retryCount: MaxWriteAndIngestRetryTimes, }, { keyRange: common.Range{Start: []byte{'c', '2'}, End: []byte{'d'}}, ingestData: &Engine{}, injected: getSuccessInjectedBehaviour(), - retryCount: maxWriteAndIngestRetryTimes, + retryCount: MaxWriteAndIngestRetryTimes, }, }, }, diff --git a/br/pkg/lightning/backend/local/region_job.go b/br/pkg/lightning/backend/local/region_job.go index d074d54264c66..b70c94cf7045a 100644 --- a/br/pkg/lightning/backend/local/region_job.go +++ b/br/pkg/lightning/backend/local/region_job.go @@ -525,6 +525,7 @@ func (local *Backend) ingest(ctx context.Context, j *regionJob) (err error) { log.FromContext(ctx).Warn("meet underlying error, will retry ingest", log.ShortError(err), logutil.SSTMetas(j.writeResult.sstMeta), logutil.Region(j.region.Region), logutil.Leader(j.region.Leader)) + j.lastRetryableErr = err continue } canContinue, err := j.convertStageOnIngestError(resp) @@ -575,6 +576,9 @@ func (local *Backend) checkWriteStall( // doIngest send ingest commands to TiKV based on regionJob.writeResult.sstMeta. // When meet error, it will remove finished sstMetas before return. func (local *Backend) doIngest(ctx context.Context, j *regionJob) (*sst.IngestResponse, error) { + failpoint.Inject("doIngestFailed", func() { + failpoint.Return(nil, errors.New("injected error")) + }) clientFactory := local.importClientFactory supportMultiIngest := local.supportMultiIngest shouldCheckWriteStall := local.ShouldCheckWriteStall diff --git a/tests/realtikvtest/addindextest4/ingest_test.go b/tests/realtikvtest/addindextest4/ingest_test.go index 596697df37824..04aa59a32e6f5 100644 --- a/tests/realtikvtest/addindextest4/ingest_test.go +++ b/tests/realtikvtest/addindextest4/ingest_test.go @@ -580,3 +580,31 @@ func TestFirstLitSlowStart(t *testing.T) { }() wg.Wait() } + +func TestIssue55808(t *testing.T) { + store := realtikvtest.CreateMockStoreAndSetup(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("drop database if exists addindexlit;") + tk.MustExec("create database addindexlit;") + tk.MustExec("use addindexlit;") + tk.MustExec(`set global tidb_ddl_enable_fast_reorg=on;`) + tk.MustExec("set global tidb_enable_dist_task = off;") + tk.MustExec("set global tidb_ddl_error_count_limit = 0") + + backup := local.MaxWriteAndIngestRetryTimes + local.MaxWriteAndIngestRetryTimes = 1 + t.Cleanup(func() { + local.MaxWriteAndIngestRetryTimes = backup + }) + + tk.MustExec("create table t (a int primary key, b int);") + + for i := 0; i < 4; i++ { + tk.MustExec(fmt.Sprintf("insert into t values (%d, %d);", i*10000, i*10000)) + } + + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/doIngestFailed", "return()")) + err := tk.ExecToErr("alter table t add index idx(a);") + require.ErrorContains(t, err, "injected error") + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/doIngestFailed")) +}