Skip to content

Commit

Permalink
fix: job queue panic recover (#219)
Browse files Browse the repository at this point in the history
  • Loading branch information
Mahanmmi committed Jun 18, 2024
1 parent 321339b commit fa9a485
Showing 1 changed file with 18 additions and 12 deletions.
30 changes: 18 additions & 12 deletions pkg/plugin/sdk/job_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,21 +53,27 @@ func (q *JobQueue) Push(job Job) {
q.queue <- job
}

func (q *JobQueue) Start() {
for i := 0; i < q.maxConcurrent; i++ {
go q.run()
func (q *JobQueue) finisher() {
if err := recover(); err != nil {
log.Printf("Job queue finisher panic: %v", err)
go q.finisher()
}

go func() {
for {
if q.finishedCounter.Load() == q.pendingCounter.Load() && q.onFinish != nil {
time.Sleep(500 * time.Millisecond)
log.Printf("All jobs are finished - calling onFinish, job counts: %d/%d", q.finishedCounter.Load(), q.pendingCounter.Load())
q.onFinish()
}
for {
if q.finishedCounter.Load() == q.pendingCounter.Load() && q.onFinish != nil {
time.Sleep(500 * time.Millisecond)
log.Printf("All jobs are finished - calling onFinish, job counts: %d/%d", q.finishedCounter.Load(), q.pendingCounter.Load())
q.onFinish()
}
}()
time.Sleep(500 * time.Millisecond)
}
}

func (q *JobQueue) Start() {
for i := 0; i < q.maxConcurrent; i++ {
go q.run()
}
go q.finisher()
}

func (q *JobQueue) SetOnFinish(f func()) {
Expand Down Expand Up @@ -101,7 +107,7 @@ func (q *JobQueue) run() {
log.Printf("Finished job %s", job.Id())
}

q.stream.Send(&golang.PluginMessage{
_ = q.stream.Send(&golang.PluginMessage{
PluginMessage: &golang.PluginMessage_Job{
Job: jobResult,
},
Expand Down

0 comments on commit fa9a485

Please sign in to comment.