From 256265f9a206d71d7fcb753181c9409e4fc575f3 Mon Sep 17 00:00:00 2001 From: John Roesler Date: Thu, 18 Jul 2024 11:32:56 -0500 Subject: [PATCH] internal refactoring of JobOption constructor, clock moved to exec (#761) --- .github/workflows/go_test.yml | 2 +- .golangci.yaml | 25 +++++------- executor.go | 51 ++++++++++++++++++------- job.go | 33 ++++++++-------- job_test.go | 2 +- scheduler.go | 72 ++++++++++++++++++++++------------- scheduler_test.go | 44 ++++++++++----------- util_test.go | 2 +- 8 files changed, 133 insertions(+), 98 deletions(-) diff --git a/.github/workflows/go_test.yml b/.github/workflows/go_test.yml index e75ccff0..7384e6e6 100644 --- a/.github/workflows/go_test.yml +++ b/.github/workflows/go_test.yml @@ -27,6 +27,6 @@ - name: golangci-lint uses: golangci/golangci-lint-action@v6.0.1 with: - version: v1.55.2 + version: v1.59.1 - name: test run: make test_ci diff --git a/.golangci.yaml b/.golangci.yaml index 07878d85..9d6ae5d7 100644 --- a/.golangci.yaml +++ b/.golangci.yaml @@ -2,14 +2,20 @@ run: timeout: 5m issues-exit-code: 1 tests: true - skip-dirs: - - local issues: max-same-issues: 100 include: - EXC0012 - EXC0014 + exclude-dirs: + - local + exclude-rules: + - path: example_test.go + linters: + - revive + text: "seems to be unused" + fix: true linters: enable: @@ -29,21 +35,10 @@ linters: - whitespace output: - # colored-line-number|line-number|json|tab|checkstyle|code-climate, default is "colored-line-number" - format: colored-line-number - # print lines of code with issue, default is true + formats: + - format: colored-line-number print-issued-lines: true - # print linter name in the end of issue text, default is true print-linter-name: true - # make issues output unique by line, default is true uniq-by-line: true - # add a prefix to the output file references; default is no prefix path-prefix: "" - # sorts results by: filepath, line and column sort-results: true - -linters-settings: - golint: - min-confidence: 0.8 - -fix: true diff --git a/executor.go b/executor.go index f3661970..6e85f1a3 100644 --- a/executor.go +++ b/executor.go @@ -7,25 +7,48 @@ import ( "sync" "time" + "github.com/jonboulle/clockwork" + "github.com/google/uuid" ) type executor struct { - ctx context.Context - cancel context.CancelFunc - logger Logger - stopCh chan struct{} - jobsIn chan jobIn + // context used for shutting down + ctx context.Context + // cancel used by the executor to signal a stop of it's functions + cancel context.CancelFunc + // clock used for regular time or mocking time + clock clockwork.Clock + // the executor's logger + logger Logger + + // receives jobs scheduled to execute + jobsIn chan jobIn + // sends out jobs for rescheduling jobsOutForRescheduling chan uuid.UUID - jobsOutCompleted chan uuid.UUID - jobOutRequest chan jobOutRequest - stopTimeout time.Duration - done chan error - singletonRunners *sync.Map // map[uuid.UUID]singletonRunner - limitMode *limitModeConfig - elector Elector - locker Locker - monitor Monitor + // sends out jobs once completed + jobsOutCompleted chan uuid.UUID + // used to request jobs from the scheduler + jobOutRequest chan jobOutRequest + + // used by the executor to receive a stop signal from the scheduler + stopCh chan struct{} + // the timeout value when stopping + stopTimeout time.Duration + // used to signal that the executor has completed shutdown + done chan error + + // runners for any singleton type jobs + // map[uuid.UUID]singletonRunner + singletonRunners *sync.Map + // config for limit mode + limitMode *limitModeConfig + // the elector when running distributed instances + elector Elector + // the locker when running distributed instances + locker Locker + // monitor for reporting metrics + monitor Monitor } type jobIn struct { diff --git a/job.go b/job.go index 0668a5ff..e7c2135d 100644 --- a/job.go +++ b/job.go @@ -475,7 +475,7 @@ func OneTimeJobStartImmediately() OneTimeJobStartAtOption { // OneTimeJobStartDateTime sets the date & time at which the job should run. // This datetime must be in the future (according to the scheduler clock). func OneTimeJobStartDateTime(start time.Time) OneTimeJobStartAtOption { - return func(j *internalJob) []time.Time { + return func(_ *internalJob) []time.Time { return []time.Time{start} } } @@ -483,7 +483,7 @@ func OneTimeJobStartDateTime(start time.Time) OneTimeJobStartAtOption { // OneTimeJobStartDateTimes sets the date & times at which the job should run. // At least one of the date/times must be in the future (according to the scheduler clock). func OneTimeJobStartDateTimes(times ...time.Time) OneTimeJobStartAtOption { - return func(j *internalJob) []time.Time { + return func(_ *internalJob) []time.Time { return times } } @@ -503,13 +503,13 @@ func OneTimeJob(startAt OneTimeJobStartAtOption) JobDefinition { // ----------------------------------------------- // JobOption defines the constructor for job options. -type JobOption func(*internalJob) error +type JobOption func(*internalJob, time.Time) error // WithDistributedJobLocker sets the locker to be used by multiple // Scheduler instances to ensure that only one instance of each // job is run. func WithDistributedJobLocker(locker Locker) JobOption { - return func(j *internalJob) error { + return func(j *internalJob, _ time.Time) error { if locker == nil { return ErrWithDistributedJobLockerNil } @@ -521,7 +521,7 @@ func WithDistributedJobLocker(locker Locker) JobOption { // WithEventListeners sets the event listeners that should be // run for the job. func WithEventListeners(eventListeners ...EventListener) JobOption { - return func(j *internalJob) error { + return func(j *internalJob, _ time.Time) error { for _, eventListener := range eventListeners { if err := eventListener(j); err != nil { return err @@ -534,7 +534,7 @@ func WithEventListeners(eventListeners ...EventListener) JobOption { // WithLimitedRuns limits the number of executions of this job to n. // Upon reaching the limit, the job is removed from the scheduler. func WithLimitedRuns(limit uint) JobOption { - return func(j *internalJob) error { + return func(j *internalJob, _ time.Time) error { j.limitRunsTo = &limitRunsTo{ limit: limit, runCount: 0, @@ -546,8 +546,7 @@ func WithLimitedRuns(limit uint) JobOption { // WithName sets the name of the job. Name provides // a human-readable identifier for the job. func WithName(name string) JobOption { - // TODO use the name for metrics and future logging option - return func(j *internalJob) error { + return func(j *internalJob, _ time.Time) error { if name == "" { return ErrWithNameEmpty } @@ -560,7 +559,7 @@ func WithName(name string) JobOption { // This is useful for jobs that should not overlap, and that occasionally // (but not consistently) run longer than the interval between job runs. func WithSingletonMode(mode LimitMode) JobOption { - return func(j *internalJob) error { + return func(j *internalJob, _ time.Time) error { j.singletonMode = true j.singletonLimitMode = mode return nil @@ -570,19 +569,19 @@ func WithSingletonMode(mode LimitMode) JobOption { // WithStartAt sets the option for starting the job at // a specific datetime. func WithStartAt(option StartAtOption) JobOption { - return func(j *internalJob) error { - return option(j) + return func(j *internalJob, now time.Time) error { + return option(j, now) } } // StartAtOption defines options for starting the job -type StartAtOption func(*internalJob) error +type StartAtOption func(*internalJob, time.Time) error // WithStartImmediately tells the scheduler to run the job immediately // regardless of the type or schedule of job. After this immediate run // the job is scheduled from this time based on the job definition. func WithStartImmediately() StartAtOption { - return func(j *internalJob) error { + return func(j *internalJob, _ time.Time) error { j.startImmediately = true return nil } @@ -591,8 +590,8 @@ func WithStartImmediately() StartAtOption { // WithStartDateTime sets the first date & time at which the job should run. // This datetime must be in the future. func WithStartDateTime(start time.Time) StartAtOption { - return func(j *internalJob) error { - if start.IsZero() || start.Before(time.Now()) { + return func(j *internalJob, now time.Time) error { + if start.IsZero() || start.Before(now) { return ErrWithStartDateTimePast } j.startTime = start @@ -604,7 +603,7 @@ func WithStartDateTime(start time.Time) StartAtOption { // a way to identify jobs by a set of tags and remove // multiple jobs by tag. func WithTags(tags ...string) JobOption { - return func(j *internalJob) error { + return func(j *internalJob, _ time.Time) error { j.tags = tags return nil } @@ -614,7 +613,7 @@ func WithTags(tags ...string) JobOption { // is used to uniquely identify the job and is used for logging // and metrics. func WithIdentifier(id uuid.UUID) JobOption { - return func(j *internalJob) error { + return func(j *internalJob, _ time.Time) error { if id == uuid.Nil { return ErrWithIdentifierNil } diff --git a/job_test.go b/job_test.go index d9076eeb..29178924 100644 --- a/job_test.go +++ b/job_test.go @@ -487,7 +487,7 @@ func TestWithEventListeners(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { var ij internalJob - err := WithEventListeners(tt.eventListeners...)(&ij) + err := WithEventListeners(tt.eventListeners...)(&ij, time.Now()) assert.Equal(t, tt.err, err) if err != nil { diff --git a/scheduler.go b/scheduler.go index a9e0fc5a..02bc1fb6 100644 --- a/scheduler.go +++ b/scheduler.go @@ -56,25 +56,43 @@ type Scheduler interface { // ----------------------------------------------- type scheduler struct { - shutdownCtx context.Context - shutdownCancel context.CancelFunc - exec executor - jobs map[uuid.UUID]internalJob - location *time.Location - clock clockwork.Clock - started bool + // context used for shutting down + shutdownCtx context.Context + // cancel used to signal scheduler should shut down + shutdownCancel context.CancelFunc + // the executor, which actually runs the jobs sent to it via the scheduler + exec executor + // the map of jobs registered in the scheduler + jobs map[uuid.UUID]internalJob + // the location used by the scheduler for scheduling when relevant + location *time.Location + // whether the scheduler has been started or not + started bool + // globally applied JobOption's set on all jobs added to the scheduler + // note: individually set JobOption's take precedence. globalJobOptions []JobOption - logger Logger - - startCh chan struct{} - startedCh chan struct{} - stopCh chan struct{} - stopErrCh chan error - allJobsOutRequest chan allJobsOutRequest - jobOutRequestCh chan jobOutRequest - runJobRequestCh chan runJobRequest - newJobCh chan newJobIn - removeJobCh chan uuid.UUID + // the scheduler's logger + logger Logger + + // used to tell the scheduler to start + startCh chan struct{} + // used to report that the scheduler has started + startedCh chan struct{} + // used to tell the scheduler to stop + stopCh chan struct{} + // used to report that the scheduler has stopped + stopErrCh chan error + // used to send all the jobs out when a request is made by the client + allJobsOutRequest chan allJobsOutRequest + // used to send a jobs out when a request is made by the client + jobOutRequestCh chan jobOutRequest + // used to run a job on-demand when requested by the client + runJobRequestCh chan runJobRequest + // new jobs are received here + newJobCh chan newJobIn + // requests from the client to remove jobs by ID are received here + removeJobCh chan uuid.UUID + // requests from the client to remove jobs by tags are received here removeJobsByTagsCh chan []string } @@ -111,6 +129,7 @@ func NewScheduler(options ...SchedulerOption) (Scheduler, error) { stopTimeout: time.Second * 10, singletonRunners: nil, logger: &noOpLogger{}, + clock: clockwork.NewRealClock(), jobsIn: make(chan jobIn), jobsOutForRescheduling: make(chan uuid.UUID), @@ -125,7 +144,6 @@ func NewScheduler(options ...SchedulerOption) (Scheduler, error) { exec: exec, jobs: make(map[uuid.UUID]internalJob), location: time.Local, - clock: clockwork.NewRealClock(), logger: &noOpLogger{}, newJobCh: make(chan newJobIn), @@ -338,7 +356,7 @@ func (s *scheduler) selectExecJobsOutForRescheduling(id uuid.UUID) { } } j.nextScheduled = append(j.nextScheduled, next) - j.timer = s.clock.AfterFunc(next.Sub(s.now()), func() { + j.timer = s.exec.clock.AfterFunc(next.Sub(s.now()), func() { // set the actual timer on the job here and listen for // shut down events so that the job doesn't attempt to // run if the scheduler has been shutdown. @@ -422,7 +440,7 @@ func (s *scheduler) selectNewJob(in newJobIn) { } id := j.id - j.timer = s.clock.AfterFunc(next.Sub(s.now()), func() { + j.timer = s.exec.clock.AfterFunc(next.Sub(s.now()), func() { select { case <-s.shutdownCtx.Done(): case s.exec.jobsIn <- jobIn{ @@ -474,7 +492,7 @@ func (s *scheduler) selectStart() { } jobID := id - j.timer = s.clock.AfterFunc(next.Sub(s.now()), func() { + j.timer = s.exec.clock.AfterFunc(next.Sub(s.now()), func() { select { case <-s.shutdownCtx.Done(): case s.exec.jobsIn <- jobIn{ @@ -502,7 +520,7 @@ func (s *scheduler) selectStart() { // ----------------------------------------------- func (s *scheduler) now() time.Time { - return s.clock.Now().In(s.location) + return s.exec.clock.Now().In(s.location) } func (s *scheduler) jobFromInternalJob(in internalJob) job { @@ -643,19 +661,19 @@ func (s *scheduler) addOrUpdateJob(id uuid.UUID, definition JobDefinition, taskW // apply global job options for _, option := range s.globalJobOptions { - if err := option(&j); err != nil { + if err := option(&j, s.now()); err != nil { return nil, err } } // apply job specific options, which take precedence for _, option := range options { - if err := option(&j); err != nil { + if err := option(&j, s.now()); err != nil { return nil, err } } - if err := definition.setup(&j, s.location, s.clock.Now()); err != nil { + if err := definition.setup(&j, s.location, s.exec.clock.Now()); err != nil { return nil, err } @@ -758,7 +776,7 @@ func WithClock(clock clockwork.Clock) SchedulerOption { if clock == nil { return ErrWithClockNil } - s.clock = clock + s.exec.clock = clock return nil } } diff --git a/scheduler_test.go b/scheduler_test.go index e55b47b7..6f47e8af 100644 --- a/scheduler_test.go +++ b/scheduler_test.go @@ -812,7 +812,7 @@ func TestScheduler_NewJobTask(t *testing.T) { defer verifyNoGoroutineLeaks(t) testFuncPtr := func() {} - testFuncWithParams := func(one, two string) {} + testFuncWithParams := func(_, _ string) {} testStruct := struct{}{} tests := []struct { @@ -862,37 +862,37 @@ func TestScheduler_NewJobTask(t *testing.T) { }, { "all good struct", - NewTask(func(one struct{}) {}, struct{}{}), + NewTask(func(_ struct{}) {}, struct{}{}), nil, }, { "all good interface", - NewTask(func(one interface{}) {}, struct{}{}), + NewTask(func(_ interface{}) {}, struct{}{}), nil, }, { "all good any", - NewTask(func(one any) {}, struct{}{}), + NewTask(func(_ any) {}, struct{}{}), nil, }, { "all good slice", - NewTask(func(one []struct{}) {}, []struct{}{}), + NewTask(func(_ []struct{}) {}, []struct{}{}), nil, }, { "all good chan", - NewTask(func(one chan struct{}) {}, make(chan struct{})), + NewTask(func(_ chan struct{}) {}, make(chan struct{})), nil, }, { "all good pointer", - NewTask(func(one *struct{}) {}, &testStruct), + NewTask(func(_ *struct{}) {}, &testStruct), nil, }, { "all good map", - NewTask(func(one map[string]struct{}) {}, make(map[string]struct{})), + NewTask(func(_ map[string]struct{}) {}, make(map[string]struct{})), nil, }, { @@ -902,37 +902,37 @@ func TestScheduler_NewJobTask(t *testing.T) { }, { "parameter type does not match - different argument types against variadic parameters", - NewTask(func(args ...string) {}, "one", 2), + NewTask(func(_ ...string) {}, "one", 2), ErrNewJobWrongTypeOfParameters, }, { "all good string - variadic", - NewTask(func(args ...string) {}, "one", "two"), + NewTask(func(_ ...string) {}, "one", "two"), nil, }, { "all good mixed variadic", - NewTask(func(arg int, args ...string) {}, 1, "one", "two"), + NewTask(func(_ int, _ ...string) {}, 1, "one", "two"), nil, }, { "all good struct - variadic", - NewTask(func(args ...interface{}) {}, struct{}{}), + NewTask(func(_ ...interface{}) {}, struct{}{}), nil, }, { "all good no arguments passed in - variadic", - NewTask(func(args ...interface{}) {}), + NewTask(func(_ ...interface{}) {}), nil, }, { "all good - interface variadic, int, string", - NewTask(func(args ...interface{}) {}, 1, "2", 3.0), + NewTask(func(_ ...interface{}) {}, 1, "2", 3.0), nil, }, { "parameter type does not match - different argument types against interface variadic parameters", - NewTask(func(args ...io.Reader) {}, os.Stdout, any(3.0)), + NewTask(func(_ ...io.Reader) {}, os.Stdout, any(3.0)), ErrNewJobWrongTypeOfParameters, }, } @@ -1411,7 +1411,7 @@ func TestScheduler_WithDistributed(t *testing.T) { WithDistributedLocker(&testLocker{notLocked: notLocked}), }, nil, - func(t *testing.T) { + func(_ *testing.T) { timeout := time.Now().Add(1 * time.Second) var notLockedCount int for { @@ -1433,7 +1433,7 @@ func TestScheduler_WithDistributed(t *testing.T) { []JobOption{ WithDistributedJobLocker(&testLocker{notLocked: notLocked}), }, - func(t *testing.T) { + func(_ *testing.T) { timeout := time.Now().Add(1 * time.Second) var notLockedCount int for { @@ -1665,7 +1665,7 @@ func TestScheduler_RemoveJob_RemoveSelf(t *testing.T) { NewTask(func() {}), WithEventListeners( BeforeJobRuns( - func(jobID uuid.UUID, jobName string) { + func(_ uuid.UUID, _ string) { s.RemoveByTags("tag1") }, ), @@ -1788,7 +1788,7 @@ func TestScheduler_WithLocker_WithEventListeners(t *testing.T) { "AfterLockError", errorLocker{}, NewTask(func() {}), - AfterLockError(func(_ uuid.UUID, _ string, err error) { + AfterLockError(func(_ uuid.UUID, _ string, _ error) { listenerRunCh <- nil }), true, @@ -2031,7 +2031,7 @@ func TestScheduler_LastRunSingleton(t *testing.T) { }{ { "simple", - func(t *testing.T, j Job, jobRan chan struct{}) {}, + func(_ *testing.T, _ Job, _ chan struct{}) {}, }, { "with runNow", @@ -2155,7 +2155,7 @@ func TestScheduler_AtTimesJob(t *testing.T) { name: "no at times", atTimes: []time.Time{}, fakeClock: clockwork.NewFakeClock(), - assertErr: func(t require.TestingT, err error, i ...interface{}) { + assertErr: func(t require.TestingT, err error, _ ...interface{}) { require.ErrorIs(t, err, ErrOneTimeJobStartDateTimePast) }, }, @@ -2163,7 +2163,7 @@ func TestScheduler_AtTimesJob(t *testing.T) { name: "all in the past", atTimes: []time.Time{n.Add(-1 * time.Second)}, fakeClock: clockwork.NewFakeClockAt(n), - assertErr: func(t require.TestingT, err error, i ...interface{}) { + assertErr: func(t require.TestingT, err error, _ ...interface{}) { require.ErrorIs(t, err, ErrOneTimeJobStartDateTimePast) }, }, diff --git a/util_test.go b/util_test.go index 2214ade9..05cc82d5 100644 --- a/util_test.go +++ b/util_test.go @@ -57,7 +57,7 @@ func TestCallJobFuncWithParams(t *testing.T) { }, { "wrong number of params", - func(one string, two int) {}, + func(_ string, _ int) {}, []any{"one"}, nil, },