Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix flaky periodic tests #17586

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions client/pkg/testutil/recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,3 +137,38 @@ func newLenErr(expected int, actual int) error {
s := fmt.Sprintf("len(actions) = %d, expected >= %d", actual, expected)
return errors.New(s)
}

type recorderSync struct {
ch chan Action
}

func NewRecorderSync() Recorder {
return &recorderSync{ch: make(chan Action)}
}

func (r *recorderSync) Record(a Action) {
r.ch <- a
}

func (r *recorderSync) Action() (acts []Action) {
for {
select {
case act := <-r.ch:
acts = append(acts, act)
default:
return acts
}
}
}

func (r *recorderSync) Chan() <-chan Action {
return r.ch
}
Comment on lines +145 to +166
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The implementation is identical to recorderStream.


func (r *recorderSync) Wait(n int) ([]Action, error) {
acts := make([]Action, 0, n)
for i := 0; i < n; i++ {
acts = append(acts, <-r.ch)
}
return acts, nil
}
Comment on lines +168 to +174
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The implementation in #17513 can also resolve this by setting an unlimited timeout.

84 changes: 40 additions & 44 deletions server/etcdserver/api/v3compactor/periodic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,26 +27,30 @@ import (
"go.etcd.io/etcd/client/pkg/v3/testutil"
)

func advanceForDuration(fc clockwork.FakeClock, rg *fakeRevGetter, duration, interval time.Duration) {
for i := 0; i < int(duration/interval); i++ {
// wait for periodic to call rev
_, _ = rg.Wait(1)
// Block until the periodic is waiting on the clock
fc.BlockUntil(1)
fc.Advance(interval)
}
}

func TestPeriodicHourly(t *testing.T) {
retentionHours := 2
retentionDuration := time.Duration(retentionHours) * time.Hour

fc := clockwork.NewFakeClock()
// TODO: Do not depand or real time (Recorder.Wait) in unit tests.
rg := &fakeRevGetter{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond), 0}
compactable := &fakeCompactable{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond)}
rg := &fakeRevGetter{testutil.NewRecorderSync(), 0}
compactable := &fakeCompactable{testutil.NewRecorderSync()}
tb := newPeriodic(zaptest.NewLogger(t), fc, retentionDuration, rg, compactable)

tb.Run()
defer tb.Stop()

initialIntervals, intervalsPerPeriod := tb.getRetentions(), 10

// compaction doesn't happen til 2 hours elapse
for i := 0; i < initialIntervals; i++ {
rg.Wait(1)
fc.Advance(tb.getRetryInterval())
}
advanceForDuration(fc, rg, retentionDuration, tb.getRetryInterval())

// very first compaction
a, err := compactable.Wait(1)
Expand All @@ -62,10 +66,7 @@ func TestPeriodicHourly(t *testing.T) {
// now compactor kicks in, every hour
for i := 0; i < 3; i++ {
// advance one hour, one revision for each interval
for j := 0; j < intervalsPerPeriod; j++ {
rg.Wait(1)
fc.Advance(tb.getRetryInterval())
}
advanceForDuration(fc, rg, time.Hour, tb.getRetryInterval())

a, err = compactable.Wait(1)
if err != nil {
Expand All @@ -84,20 +85,15 @@ func TestPeriodicMinutes(t *testing.T) {
retentionDuration := time.Duration(retentionMinutes) * time.Minute

fc := clockwork.NewFakeClock()
rg := &fakeRevGetter{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond), 0}
compactable := &fakeCompactable{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond)}
rg := &fakeRevGetter{testutil.NewRecorderSync(), 0}
compactable := &fakeCompactable{testutil.NewRecorderSync()}
tb := newPeriodic(zaptest.NewLogger(t), fc, retentionDuration, rg, compactable)

tb.Run()
defer tb.Stop()

initialIntervals, intervalsPerPeriod := tb.getRetentions(), 10

// compaction doesn't happen til 5 minutes elapse
for i := 0; i < initialIntervals; i++ {
rg.Wait(1)
fc.Advance(tb.getRetryInterval())
}
advanceForDuration(fc, rg, retentionDuration, tb.getRetryInterval())

// very first compaction
a, err := compactable.Wait(1)
Expand All @@ -112,10 +108,7 @@ func TestPeriodicMinutes(t *testing.T) {
// compaction happens at every interval
for i := 0; i < 5; i++ {
// advance 5-minute, one revision for each interval
for j := 0; j < intervalsPerPeriod; j++ {
rg.Wait(1)
fc.Advance(tb.getRetryInterval())
}
advanceForDuration(fc, rg, retentionDuration, tb.getRetryInterval())

a, err := compactable.Wait(1)
if err != nil {
Expand All @@ -132,21 +125,17 @@ func TestPeriodicMinutes(t *testing.T) {
func TestPeriodicPause(t *testing.T) {
fc := clockwork.NewFakeClock()
retentionDuration := time.Hour
rg := &fakeRevGetter{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond), 0}
compactable := &fakeCompactable{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond)}
rg := &fakeRevGetter{testutil.NewRecorderSync(), 0}
compactable := &fakeCompactable{testutil.NewRecorderSync()}
tb := newPeriodic(zaptest.NewLogger(t), fc, retentionDuration, rg, compactable)

tb.Run()
tb.Pause()

n := tb.getRetentions()

start := fc.Now()
// tb will collect 3 hours of revisions but not compact since paused
for i := 0; i < n*3; i++ {
rg.Wait(1)
fc.Advance(tb.getRetryInterval())
}
// t.revs = [21 22 23 24 25 26 27 28 29 30]
advanceForDuration(fc, rg, 3*time.Hour, tb.getRetryInterval())
// t.revs = [20 21 22 23 24 25 26 27 28 29 30]

select {
case a := <-compactable.Chan():
Expand All @@ -156,19 +145,23 @@ func TestPeriodicPause(t *testing.T) {

// tb resumes to being blocked on the clock
tb.Resume()
rg.Wait(1)

_, _ = rg.Wait(1)
fc.BlockUntil(1)
// unblock clock, will kick off a compaction at T=3h6m by retry
fc.Advance(tb.getRetryInterval())

if elapsed := fc.Since(start); elapsed < (3*time.Hour + 6*time.Minute) {
t.Fatalf("expected time elapsed 3h6m, elapsed %v", elapsed)
}

// T=3h6m
a, err := compactable.Wait(1)
if err != nil {
t.Fatal(err)
}

// compact the revision from hour 2:06
wreq := &pb.CompactionRequest{Revision: int64(1 + 2*n + 1)}
wreq := &pb.CompactionRequest{Revision: tb.revs[0]}
if !reflect.DeepEqual(a[0].Params[0], wreq) {
t.Errorf("compact request = %v, want %v", a[0].Params[0], wreq.Revision)
}
Expand All @@ -179,20 +172,21 @@ func TestPeriodicSkipRevNotChange(t *testing.T) {
retentionDuration := time.Duration(retentionMinutes) * time.Minute

fc := clockwork.NewFakeClock()
rg := &fakeRevGetter{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond), 0}
rg := &fakeRevGetter{testutil.NewRecorderSync(), 0}
compactable := &fakeCompactable{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond)}
tb := newPeriodic(zaptest.NewLogger(t), fc, retentionDuration, rg, compactable)

tb.Run()
defer tb.Stop()

initialIntervals, intervalsPerPeriod := tb.getRetentions(), 10
intervalsPerPeriod := int(retentionDuration / tb.getRetryInterval())

// first compaction happens til 5 minutes elapsed
for i := 0; i < initialIntervals; i++ {
for i := 0; i < intervalsPerPeriod; i++ {
// every time set the same revision with 100
rg.SetRev(int64(100))
rg.Wait(1)
_, _ = rg.Wait(1)
fc.BlockUntil(1)
fc.Advance(tb.getRetryInterval())
}

Expand All @@ -212,7 +206,8 @@ func TestPeriodicSkipRevNotChange(t *testing.T) {
for i := 0; i < 5; i++ {
for j := 0; j < intervalsPerPeriod; j++ {
rg.SetRev(int64(100))
rg.Wait(1)
_, _ = rg.Wait(1)
fc.BlockUntil(1)
fc.Advance(tb.getRetryInterval())
}

Expand All @@ -223,8 +218,9 @@ func TestPeriodicSkipRevNotChange(t *testing.T) {
}

// when revision changed, compaction is normally
for i := 0; i < initialIntervals; i++ {
rg.Wait(1)
for i := 0; i < tb.getRetentions(); i++ {
_, _ = rg.Wait(1)
fc.BlockUntil(1)
fc.Advance(tb.getRetryInterval())
}

Expand Down
Loading