Skip to content

Commit

Permalink
Fix flaky periodic tests
Browse files Browse the repository at this point in the history
Signed-off-by: Allen Ray <alray@redhat.com>
  • Loading branch information
dusk125 committed Mar 14, 2024
1 parent 8292553 commit 1083ee5
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 46 deletions.
35 changes: 35 additions & 0 deletions client/pkg/testutil/recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,3 +137,38 @@ func newLenErr(expected int, actual int) error {
s := fmt.Sprintf("len(actions) = %d, expected >= %d", actual, expected)
return errors.New(s)
}

type recorderSync struct {
ch chan Action
}

func NewRecorderSync() Recorder {
return &recorderSync{ch: make(chan Action)}
}

func (r *recorderSync) Record(a Action) {
r.ch <- a
}

func (r *recorderSync) Action() (acts []Action) {
for {
select {
case act := <-r.ch:
acts = append(acts, act)
default:
return acts
}
}
}

func (r *recorderSync) Chan() <-chan Action {
return r.ch
}

func (r *recorderSync) Wait(n int) ([]Action, error) {
acts := make([]Action, 0, n)
for i := 0; i < n; i++ {
acts = append(acts, <-r.ch)
}
return acts, nil
}
87 changes: 41 additions & 46 deletions server/etcdserver/api/v3compactor/periodic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,32 +21,35 @@ import (
"time"

"github.com/jonboulle/clockwork"
"go.uber.org/zap/zaptest"

pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/client/pkg/v3/testutil"
"go.uber.org/zap/zaptest"
)

func advanceForDuration(fc clockwork.FakeClock, rg *fakeRevGetter, duration, interval time.Duration) {
for i := 0; i < int(duration/interval); i++ {
// wait for periodic to call rev
_, _ = rg.Wait(1)
// Block until the periodic is waiting on the clock
fc.BlockUntil(1)
fc.Advance(interval)
}
}

func TestPeriodicHourly(t *testing.T) {
retentionHours := 2
retentionDuration := time.Duration(retentionHours) * time.Hour

fc := clockwork.NewFakeClock()
// TODO: Do not depand or real time (Recorder.Wait) in unit tests.
rg := &fakeRevGetter{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond), 0}
compactable := &fakeCompactable{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond)}
rg := &fakeRevGetter{testutil.NewRecorderSync(), 0}
compactable := &fakeCompactable{testutil.NewRecorderSync()}
tb := newPeriodic(zaptest.NewLogger(t), fc, retentionDuration, rg, compactable)

tb.Run()
defer tb.Stop()

initialIntervals, intervalsPerPeriod := tb.getRetentions(), 10

// compaction doesn't happen til 2 hours elapse
for i := 0; i < initialIntervals; i++ {
rg.Wait(1)
fc.Advance(tb.getRetryInterval())
}
advanceForDuration(fc, rg, retentionDuration, tb.getRetryInterval())

// very first compaction
a, err := compactable.Wait(1)
Expand All @@ -62,10 +65,7 @@ func TestPeriodicHourly(t *testing.T) {
// now compactor kicks in, every hour
for i := 0; i < 3; i++ {
// advance one hour, one revision for each interval
for j := 0; j < intervalsPerPeriod; j++ {
rg.Wait(1)
fc.Advance(tb.getRetryInterval())
}
advanceForDuration(fc, rg, time.Hour, tb.getRetryInterval())

a, err = compactable.Wait(1)
if err != nil {
Expand All @@ -84,20 +84,15 @@ func TestPeriodicMinutes(t *testing.T) {
retentionDuration := time.Duration(retentionMinutes) * time.Minute

fc := clockwork.NewFakeClock()
rg := &fakeRevGetter{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond), 0}
compactable := &fakeCompactable{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond)}
rg := &fakeRevGetter{testutil.NewRecorderSync(), 0}
compactable := &fakeCompactable{testutil.NewRecorderSync()}
tb := newPeriodic(zaptest.NewLogger(t), fc, retentionDuration, rg, compactable)

tb.Run()
defer tb.Stop()

initialIntervals, intervalsPerPeriod := tb.getRetentions(), 10

// compaction doesn't happen til 5 minutes elapse
for i := 0; i < initialIntervals; i++ {
rg.Wait(1)
fc.Advance(tb.getRetryInterval())
}
advanceForDuration(fc, rg, retentionDuration, tb.getRetryInterval())

// very first compaction
a, err := compactable.Wait(1)
Expand All @@ -112,10 +107,7 @@ func TestPeriodicMinutes(t *testing.T) {
// compaction happens at every interval
for i := 0; i < 5; i++ {
// advance 5-minute, one revision for each interval
for j := 0; j < intervalsPerPeriod; j++ {
rg.Wait(1)
fc.Advance(tb.getRetryInterval())
}
advanceForDuration(fc, rg, retentionDuration, tb.getRetryInterval())

a, err := compactable.Wait(1)
if err != nil {
Expand All @@ -132,21 +124,17 @@ func TestPeriodicMinutes(t *testing.T) {
func TestPeriodicPause(t *testing.T) {
fc := clockwork.NewFakeClock()
retentionDuration := time.Hour
rg := &fakeRevGetter{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond), 0}
compactable := &fakeCompactable{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond)}
rg := &fakeRevGetter{testutil.NewRecorderSync(), 0}
compactable := &fakeCompactable{testutil.NewRecorderSync()}
tb := newPeriodic(zaptest.NewLogger(t), fc, retentionDuration, rg, compactable)

tb.Run()
tb.Pause()

n := tb.getRetentions()

start := fc.Now()
// tb will collect 3 hours of revisions but not compact since paused
for i := 0; i < n*3; i++ {
rg.Wait(1)
fc.Advance(tb.getRetryInterval())
}
// t.revs = [21 22 23 24 25 26 27 28 29 30]
advanceForDuration(fc, rg, 3*time.Hour, tb.getRetryInterval())
// t.revs = [20 21 22 23 24 25 26 27 28 29 30]

select {
case a := <-compactable.Chan():
Expand All @@ -156,19 +144,23 @@ func TestPeriodicPause(t *testing.T) {

// tb resumes to being blocked on the clock
tb.Resume()
rg.Wait(1)

_, _ = rg.Wait(1)
fc.BlockUntil(1)
// unblock clock, will kick off a compaction at T=3h6m by retry
fc.Advance(tb.getRetryInterval())

if elapsed := fc.Since(start); elapsed < (3*time.Hour + 6*time.Minute) {
t.Fatalf("expected time elapsed 3h6m, elapsed %v", elapsed)
}

// T=3h6m
a, err := compactable.Wait(1)
if err != nil {
t.Fatal(err)
}

// compact the revision from hour 2:06
wreq := &pb.CompactionRequest{Revision: int64(1 + 2*n + 1)}
wreq := &pb.CompactionRequest{Revision: tb.revs[0]}
if !reflect.DeepEqual(a[0].Params[0], wreq) {
t.Errorf("compact request = %v, want %v", a[0].Params[0], wreq.Revision)
}
Expand All @@ -179,20 +171,21 @@ func TestPeriodicSkipRevNotChange(t *testing.T) {
retentionDuration := time.Duration(retentionMinutes) * time.Minute

fc := clockwork.NewFakeClock()
rg := &fakeRevGetter{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond), 0}
rg := &fakeRevGetter{testutil.NewRecorderSync(), 0}
compactable := &fakeCompactable{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond)}
tb := newPeriodic(zaptest.NewLogger(t), fc, retentionDuration, rg, compactable)

tb.Run()
defer tb.Stop()

initialIntervals, intervalsPerPeriod := tb.getRetentions(), 10
intervalsPerPeriod := int(retentionDuration / tb.getRetryInterval())

// first compaction happens til 5 minutes elapsed
for i := 0; i < initialIntervals; i++ {
for i := 0; i < intervalsPerPeriod; i++ {
// every time set the same revision with 100
rg.SetRev(int64(100))
rg.Wait(1)
_, _ = rg.Wait(1)
fc.BlockUntil(1)
fc.Advance(tb.getRetryInterval())
}

Expand All @@ -212,7 +205,8 @@ func TestPeriodicSkipRevNotChange(t *testing.T) {
for i := 0; i < 5; i++ {
for j := 0; j < intervalsPerPeriod; j++ {
rg.SetRev(int64(100))
rg.Wait(1)
_, _ = rg.Wait(1)
fc.BlockUntil(1)
fc.Advance(tb.getRetryInterval())
}

Expand All @@ -223,8 +217,9 @@ func TestPeriodicSkipRevNotChange(t *testing.T) {
}

// when revision changed, compaction is normally
for i := 0; i < initialIntervals; i++ {
rg.Wait(1)
for i := 0; i < tb.getRetentions(); i++ {
_, _ = rg.Wait(1)
fc.BlockUntil(1)
fc.Advance(tb.getRetryInterval())
}

Expand Down

0 comments on commit 1083ee5

Please sign in to comment.