diff --git a/pkg/plugin/sdk/job_queue.go b/pkg/plugin/sdk/job_queue.go index 02f122e..7a1f8ec 100644 --- a/pkg/plugin/sdk/job_queue.go +++ b/pkg/plugin/sdk/job_queue.go @@ -53,21 +53,27 @@ func (q *JobQueue) Push(job Job) { q.queue <- job } -func (q *JobQueue) Start() { - for i := 0; i < q.maxConcurrent; i++ { - go q.run() +func (q *JobQueue) finisher() { + if err := recover(); err != nil { + log.Printf("Job queue finisher panic: %v", err) + go q.finisher() } - go func() { - for { - if q.finishedCounter.Load() == q.pendingCounter.Load() && q.onFinish != nil { - time.Sleep(500 * time.Millisecond) - log.Printf("All jobs are finished - calling onFinish, job counts: %d/%d", q.finishedCounter.Load(), q.pendingCounter.Load()) - q.onFinish() - } + for { + if q.finishedCounter.Load() == q.pendingCounter.Load() && q.onFinish != nil { time.Sleep(500 * time.Millisecond) + log.Printf("All jobs are finished - calling onFinish, job counts: %d/%d", q.finishedCounter.Load(), q.pendingCounter.Load()) + q.onFinish() } - }() + time.Sleep(500 * time.Millisecond) + } +} + +func (q *JobQueue) Start() { + for i := 0; i < q.maxConcurrent; i++ { + go q.run() + } + go q.finisher() } func (q *JobQueue) SetOnFinish(f func()) { @@ -101,7 +107,7 @@ func (q *JobQueue) run() { log.Printf("Finished job %s", job.Id()) } - q.stream.Send(&golang.PluginMessage{ + _ = q.stream.Send(&golang.PluginMessage{ PluginMessage: &golang.PluginMessage_Job{ Job: jobResult, },