From 3b2dcd869b59102a5ba547e9c795df942688665b Mon Sep 17 00:00:00 2001 From: Higan Date: Sat, 20 Jul 2024 02:10:28 +0800 Subject: [PATCH] issue-654: allow setting a stopTime for job. (#760) --- errors.go | 3 +++ executor.go | 4 ++++ job.go | 38 ++++++++++++++++++++++++++++++++++++++ scheduler.go | 4 ++++ scheduler_test.go | 31 +++++++++++++++++++++++++++++++ 5 files changed, 80 insertions(+) diff --git a/errors.go b/errors.go index 7388cb99..53781257 100644 --- a/errors.go +++ b/errors.go @@ -45,6 +45,9 @@ var ( ErrWithMonitorNil = fmt.Errorf("gocron: WithMonitor: monitor must not be nil") ErrWithNameEmpty = fmt.Errorf("gocron: WithName: name must not be empty") ErrWithStartDateTimePast = fmt.Errorf("gocron: WithStartDateTime: start must not be in the past") + ErrWithStopDateTimePast = fmt.Errorf("gocron: WithStopDateTime: end must not be in the past") + ErrStartTimeLaterThanEndTime = fmt.Errorf("gocron: WithStartDateTime: start must not be later than end") + ErrStopTimeEarlierThanStartTime = fmt.Errorf("gocron: WithStopDateTime: end must not be earlier than start") ErrWithStopTimeoutZeroOrNegative = fmt.Errorf("gocron: WithStopTimeout: timeout must be greater than 0") ) diff --git a/executor.go b/executor.go index 6e85f1a3..1b13285d 100644 --- a/executor.go +++ b/executor.go @@ -358,6 +358,10 @@ func (e *executor) runJob(j internalJob, jIn jobIn) { default: } + if j.stopTimeReached(e.clock.Now()) { + return + } + if e.elector != nil { if err := e.elector.IsLeader(j.ctx); err != nil { e.sendOutForRescheduling(&jIn) diff --git a/job.go b/job.go index e7c2135d..6f09f3d8 100644 --- a/job.go +++ b/job.go @@ -38,6 +38,7 @@ type internalJob struct { limitRunsTo *limitRunsTo startTime time.Time startImmediately bool + stopTime time.Time // event listeners afterJobRuns func(jobID uuid.UUID, jobName string) beforeJobRuns func(jobID uuid.UUID, jobName string) @@ -60,6 +61,13 @@ func (j *internalJob) stop() { j.cancel() } +func (j *internalJob) stopTimeReached(now time.Time) bool { + if j.stopTime.IsZero() { + return false + } + return j.stopTime.Before(now) +} + // task stores the function and parameters // that are actually run when the job is executed. type task struct { @@ -594,11 +602,41 @@ func WithStartDateTime(start time.Time) StartAtOption { if start.IsZero() || start.Before(now) { return ErrWithStartDateTimePast } + if !j.stopTime.IsZero() && j.stopTime.Before(start) { + return ErrStartTimeLaterThanEndTime + } j.startTime = start return nil } } +// WithStopAt sets the option for stopping the job from running +// after the specified time. +func WithStopAt(option StopAtOption) JobOption { + return func(j *internalJob, now time.Time) error { + return option(j, now) + } +} + +// StopAtOption defines options for stopping the job +type StopAtOption func(*internalJob, time.Time) error + +// WithStopDateTime sets the final date & time after which the job should stop. +// This must be in the future and should be after the startTime (if specified). +// The job's final run may be at the stop time, but not after. +func WithStopDateTime(end time.Time) StopAtOption { + return func(j *internalJob, now time.Time) error { + if end.IsZero() || end.Before(now) { + return ErrWithStopDateTimePast + } + if end.Before(j.startTime) { + return ErrStopTimeEarlierThanStartTime + } + j.stopTime = end + return nil + } +} + // WithTags sets the tags for the job. Tags provide // a way to identify jobs by a set of tags and remove // multiple jobs by tag. diff --git a/scheduler.go b/scheduler.go index 02bc1fb6..4131747d 100644 --- a/scheduler.go +++ b/scheduler.go @@ -325,6 +325,10 @@ func (s *scheduler) selectExecJobsOutForRescheduling(id uuid.UUID) { return } + if j.stopTimeReached(s.now()) { + return + } + scheduleFrom := j.lastRun if len(j.nextScheduled) > 0 { // always grab the last element in the slice as that is the furthest diff --git a/scheduler_test.go b/scheduler_test.go index 6f47e8af..212eb70d 100644 --- a/scheduler_test.go +++ b/scheduler_test.go @@ -150,6 +150,21 @@ func TestScheduler_LongRunningJobs(t *testing.T) { options []SchedulerOption expectedRuns int }{ + { + "duration with stop time between executions", + durationCh, + DurationJob( + time.Millisecond * 500, + ), + NewTask( + func() { + time.Sleep(1 * time.Second) + durationCh <- struct{}{} + }), + []JobOption{WithStopAt(WithStopDateTime(time.Now().Add(time.Millisecond * 1100)))}, + []SchedulerOption{WithStopTimeout(time.Second * 2)}, + 2, + }, { "duration", durationCh, @@ -755,6 +770,22 @@ func TestScheduler_NewJobErrors(t *testing.T) { []JobOption{WithStartAt(WithStartDateTime(time.Now().Add(-time.Second)))}, ErrWithStartDateTimePast, }, + { + "WithStartDateTime is later than the end", + DurationJob( + time.Second, + ), + []JobOption{WithStopAt(WithStopDateTime(time.Now().Add(time.Second))), WithStartAt(WithStartDateTime(time.Now().Add(time.Hour)))}, + ErrStartTimeLaterThanEndTime, + }, + { + "WithStopDateTime is earlier than the start", + DurationJob( + time.Second, + ), + []JobOption{WithStartAt(WithStartDateTime(time.Now().Add(time.Hour))), WithStopAt(WithStopDateTime(time.Now().Add(time.Second)))}, + ErrStopTimeEarlierThanStartTime, + }, { "oneTimeJob start at is zero", OneTimeJob(OneTimeJobStartDateTime(time.Time{})),