From 64d6e48dae23488d9a160ae3b1f5070e3c06228a Mon Sep 17 00:00:00 2001 From: Rodrigo Broggi Date: Fri, 21 Jun 2024 23:51:42 +0200 Subject: [PATCH] issue-740: expand oneTimeJob to support multiple times (#741) --- example_test.go | 11 +++ job.go | 78 ++++++++++++++++---- scheduler_test.go | 182 ++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 256 insertions(+), 15 deletions(-) diff --git a/example_test.go b/example_test.go index a4baa8f2..478af1f2 100644 --- a/example_test.go +++ b/example_test.go @@ -354,6 +354,17 @@ func ExampleOneTimeJob() { func() {}, ), ) + // run job twice - once in 10 seconds and once in 55 minutes + n := time.Now() + _, _ = s.NewJob( + OneTimeJob( + OneTimeJobStartDateTimes( + n.Add(10*time.Second), + n.Add(55*time.Minute), + ), + ), + NewTask(func() {}), + ) s.Start() } diff --git a/job.go b/job.go index 1f45bf9f..7f1f2c40 100644 --- a/job.go +++ b/job.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "math/rand" + "sort" "strings" "time" @@ -446,35 +447,47 @@ type oneTimeJobDefinition struct { } func (o oneTimeJobDefinition) setup(j *internalJob, _ *time.Location, now time.Time) error { - j.jobSchedule = oneTimeJob{} - if err := o.startAt(j); err != nil { - return err + sortedTimes := o.startAt(j) + sort.Slice(sortedTimes, func(i, j int) bool { + return sortedTimes[i].Before(sortedTimes[j]) + }) + // keep only schedules that are in the future + idx, found := slices.BinarySearchFunc(sortedTimes, now, timeCmp()) + if found { + idx++ } - // in case we are not in the `startImmediately` case, our start-date must be in - // the future according to the scheduler clock - if !j.startImmediately && (j.startTime.IsZero() || j.startTime.Before(now)) { + sortedTimes = sortedTimes[idx:] + if !j.startImmediately && len(sortedTimes) == 0 { return ErrOneTimeJobStartDateTimePast } + j.jobSchedule = oneTimeJob{sortedTimes: sortedTimes} return nil } // OneTimeJobStartAtOption defines when the one time job is run -type OneTimeJobStartAtOption func(*internalJob) error +type OneTimeJobStartAtOption func(*internalJob) []time.Time // OneTimeJobStartImmediately tells the scheduler to run the one time job immediately. func OneTimeJobStartImmediately() OneTimeJobStartAtOption { - return func(j *internalJob) error { + return func(j *internalJob) []time.Time { j.startImmediately = true - return nil + return []time.Time{} } } // OneTimeJobStartDateTime sets the date & time at which the job should run. // This datetime must be in the future (according to the scheduler clock). func OneTimeJobStartDateTime(start time.Time) OneTimeJobStartAtOption { - return func(j *internalJob) error { - j.startTime = start - return nil + return func(j *internalJob) []time.Time { + return []time.Time{start} + } +} + +// OneTimeJobStartDateTimes sets the date & times at which the job should run. +// At least one of the date/times must be in the future (according to the scheduler clock). +func OneTimeJobStartDateTimes(times ...time.Time) OneTimeJobStartAtOption { + return func(j *internalJob) []time.Time { + return times } } @@ -486,6 +499,18 @@ func OneTimeJob(startAt OneTimeJobStartAtOption) JobDefinition { } } +func timeCmp() func(element time.Time, target time.Time) int { + return func(element time.Time, target time.Time) int { + if element.Equal(target) { + return 0 + } + if element.Before(target) { + return -1 + } + return 1 + } +} + // ----------------------------------------------- // ----------------------------------------------- // ----------------- Job Options ----------------- @@ -876,10 +901,33 @@ func (m monthlyJob) nextMonthDayAtTime(lastRun time.Time, days []int, firstPass var _ jobSchedule = (*oneTimeJob)(nil) -type oneTimeJob struct{} +type oneTimeJob struct { + sortedTimes []time.Time +} -func (o oneTimeJob) next(_ time.Time) time.Time { - return time.Time{} +// next finds the next item in a sorted list of times using binary-search. +// +// example: sortedTimes: [2, 4, 6, 8] +// +// lastRun: 1 => [idx=0,found=false] => next is 2 - sorted[idx] idx=0 +// lastRun: 2 => [idx=0,found=true] => next is 4 - sorted[idx+1] idx=1 +// lastRun: 3 => [idx=1,found=false] => next is 4 - sorted[idx] idx=1 +// lastRun: 4 => [idx=1,found=true] => next is 6 - sorted[idx+1] idx=2 +// lastRun: 7 => [idx=3,found=false] => next is 8 - sorted[idx] idx=3 +// lastRun: 8 => [idx=3,found=found] => next is none +// lastRun: 9 => [idx=3,found=found] => next is none +func (o oneTimeJob) next(lastRun time.Time) time.Time { + idx, found := slices.BinarySearchFunc(o.sortedTimes, lastRun, timeCmp()) + // if found, the next run is the following index + if found { + idx++ + } + // exhausted runs + if idx >= len(o.sortedTimes) { + return time.Time{} + } + + return o.sortedTimes[idx] } // ----------------------------------------------- diff --git a/scheduler_test.go b/scheduler_test.go index b3b73357..78044680 100644 --- a/scheduler_test.go +++ b/scheduler_test.go @@ -2089,6 +2089,188 @@ func TestScheduler_OneTimeJob(t *testing.T) { } } +func TestScheduler_AtTimesJob(t *testing.T) { + defer verifyNoGoroutineLeaks(t) + + n := time.Now().UTC() + + tests := []struct { + name string + atTimes []time.Time + fakeClock clockwork.FakeClock + assertErr require.ErrorAssertionFunc + // asserts things about schedules, advance time and perform new assertions + advanceAndAsserts []func( + t *testing.T, + j Job, + clock clockwork.FakeClock, + runs *atomic.Uint32, + ) + }{ + { + name: "no at times", + atTimes: []time.Time{}, + fakeClock: clockwork.NewFakeClock(), + assertErr: func(t require.TestingT, err error, i ...interface{}) { + require.ErrorIs(t, err, ErrOneTimeJobStartDateTimePast) + }, + }, + { + name: "all in the past", + atTimes: []time.Time{n.Add(-1 * time.Second)}, + fakeClock: clockwork.NewFakeClockAt(n), + assertErr: func(t require.TestingT, err error, i ...interface{}) { + require.ErrorIs(t, err, ErrOneTimeJobStartDateTimePast) + }, + }, + { + name: "one run 1 millisecond in the future", + atTimes: []time.Time{n.Add(1 * time.Millisecond)}, + fakeClock: clockwork.NewFakeClockAt(n), + advanceAndAsserts: []func(t *testing.T, j Job, clock clockwork.FakeClock, runs *atomic.Uint32){ + func(t *testing.T, j Job, clock clockwork.FakeClock, runs *atomic.Uint32) { + require.Equal(t, uint32(0), runs.Load()) + + // last not initialized + lastRunAt, err := j.LastRun() + require.NoError(t, err) + require.Equal(t, time.Time{}, lastRunAt) + + // next is now + nextRunAt, err := j.NextRun() + require.NoError(t, err) + require.Equal(t, n.Add(1*time.Millisecond), nextRunAt) + + // advance and eventually run + clock.Advance(2 * time.Millisecond) + require.Eventually(t, func() bool { + return assert.Equal(t, uint32(1), runs.Load()) + }, 3*time.Second, 100*time.Millisecond) + + // last was run + lastRunAt, err = j.LastRun() + require.NoError(t, err) + require.WithinDuration(t, n.Add(1*time.Millisecond), lastRunAt, 1*time.Millisecond) + + nextRunAt, err = j.NextRun() + require.NoError(t, err) + require.Equal(t, time.Time{}, nextRunAt) + }, + }, + }, + { + name: "one run in the past and one in the future", + atTimes: []time.Time{n.Add(-1 * time.Millisecond), n.Add(1 * time.Millisecond)}, + fakeClock: clockwork.NewFakeClockAt(n), + advanceAndAsserts: []func(t *testing.T, j Job, clock clockwork.FakeClock, runs *atomic.Uint32){ + func(t *testing.T, j Job, clock clockwork.FakeClock, runs *atomic.Uint32) { + require.Equal(t, uint32(0), runs.Load()) + + // last not initialized + lastRunAt, err := j.LastRun() + require.NoError(t, err) + require.Equal(t, time.Time{}, lastRunAt) + + // next is now + nextRunAt, err := j.NextRun() + require.NoError(t, err) + require.Equal(t, n.Add(1*time.Millisecond), nextRunAt) + + // advance and eventually run + clock.Advance(2 * time.Millisecond) + require.Eventually(t, func() bool { + return assert.Equal(t, uint32(1), runs.Load()) + }, 3*time.Second, 100*time.Millisecond) + + // last was run + lastRunAt, err = j.LastRun() + require.NoError(t, err) + require.WithinDuration(t, n.Add(1*time.Millisecond), lastRunAt, 1*time.Millisecond) + }, + }, + }, + { + name: "two runs in the future", + atTimes: []time.Time{n.Add(1 * time.Millisecond), n.Add(3 * time.Millisecond)}, + fakeClock: clockwork.NewFakeClockAt(n), + advanceAndAsserts: []func(t *testing.T, j Job, clock clockwork.FakeClock, runs *atomic.Uint32){ + func(t *testing.T, j Job, clock clockwork.FakeClock, runs *atomic.Uint32) { + require.Equal(t, uint32(0), runs.Load()) + + // last not initialized + lastRunAt, err := j.LastRun() + require.NoError(t, err) + require.Equal(t, time.Time{}, lastRunAt) + + // next is now + nextRunAt, err := j.NextRun() + require.NoError(t, err) + require.Equal(t, n.Add(1*time.Millisecond), nextRunAt) + + // advance and eventually run + clock.Advance(2 * time.Millisecond) + require.Eventually(t, func() bool { + return assert.Equal(t, uint32(1), runs.Load()) + }, 3*time.Second, 100*time.Millisecond) + + // last was run + lastRunAt, err = j.LastRun() + require.NoError(t, err) + require.WithinDuration(t, n.Add(1*time.Millisecond), lastRunAt, 1*time.Millisecond) + + nextRunAt, err = j.NextRun() + require.NoError(t, err) + require.Equal(t, n.Add(3*time.Millisecond), nextRunAt) + }, + + func(t *testing.T, j Job, clock clockwork.FakeClock, runs *atomic.Uint32) { + // advance and eventually run + clock.Advance(2 * time.Millisecond) + require.Eventually(t, func() bool { + return assert.Equal(t, uint32(2), runs.Load()) + }, 3*time.Second, 100*time.Millisecond) + + // last was run + lastRunAt, err := j.LastRun() + require.NoError(t, err) + require.WithinDuration(t, n.Add(3*time.Millisecond), lastRunAt, 1*time.Millisecond) + + nextRunAt, err := j.NextRun() + require.NoError(t, err) + require.Equal(t, time.Time{}, nextRunAt) + }, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + s := newTestScheduler(t, WithClock(tt.fakeClock)) + t.Cleanup(func() { + require.NoError(t, s.Shutdown()) + }) + + runs := atomic.Uint32{} + j, err := s.NewJob( + OneTimeJob(OneTimeJobStartDateTimes(tt.atTimes...)), + NewTask(func() { + runs.Add(1) + }), + ) + if tt.assertErr != nil { + tt.assertErr(t, err) + } else { + require.NoError(t, err) + s.Start() + + for _, advanceAndAssert := range tt.advanceAndAsserts { + advanceAndAssert(t, j, tt.fakeClock, &runs) + } + } + }) + } +} + func TestScheduler_WithLimitedRuns(t *testing.T) { defer verifyNoGoroutineLeaks(t)