diff --git a/CHANGELOG.md b/CHANGELOG.md index c74b48a..0a54005 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Bump google.golang.org/grpc from 1.53.0 to 1.56.3 ([#39](https://github.com/microsoft/durabletask-go/pull/39)) - Updated durabletask-protobuf submodule to [`4207e1d`](https://github.com/microsoft/durabletask-protobuf/commit/4207e1dbd14cedc268f69c3befee60fcaad19367) +- Add retries to GetWorkItems stream connection ([#72](https://github.com/microsoft/durabletask-go/pull/72)) - by [@famarting](https://github.com/famarting) ## [v0.4.0] - 2023-12-18 diff --git a/backend/activity.go b/backend/activity.go index bf53871..8f3d8df 100644 --- a/backend/activity.go +++ b/backend/activity.go @@ -23,7 +23,7 @@ type ActivityExecutor interface { func NewActivityTaskWorker(be Backend, executor ActivityExecutor, logger Logger, opts ...NewTaskWorkerOptions) TaskWorker { processor := newActivityProcessor(be, executor) - return NewTaskWorker(be, processor, logger, opts...) + return NewTaskWorker(processor, logger, opts...) } func newActivityProcessor(be Backend, executor ActivityExecutor) TaskProcessor { diff --git a/backend/orchestration.go b/backend/orchestration.go index 3892569..c416da2 100644 --- a/backend/orchestration.go +++ b/backend/orchestration.go @@ -35,7 +35,7 @@ func NewOrchestrationWorker(be Backend, executor OrchestratorExecutor, logger Lo executor: executor, logger: logger, } - return NewTaskWorker(be, processor, logger, opts...) + return NewTaskWorker(processor, logger, opts...) } // Name implements TaskProcessor diff --git a/backend/worker.go b/backend/worker.go index 9cb6f8c..c11bb4e 100644 --- a/backend/worker.go +++ b/backend/worker.go @@ -32,7 +32,6 @@ type TaskProcessor interface { } type worker struct { - backend Backend options *WorkerOptions logger Logger // dispatchSemaphore is for throttling orchestration concurrency. @@ -66,13 +65,12 @@ func WithMaxParallelism(n int32) NewTaskWorkerOptions { } } -func NewTaskWorker(be Backend, p TaskProcessor, logger Logger, opts ...NewTaskWorkerOptions) TaskWorker { +func NewTaskWorker(p TaskProcessor, logger Logger, opts ...NewTaskWorkerOptions) TaskWorker { options := &WorkerOptions{MaxParallelWorkItems: 1} for _, configure := range opts { configure(options) } return &worker{ - backend: be, processor: p, logger: logger, dispatchSemaphore: semaphore.New(int(options.MaxParallelWorkItems)), diff --git a/client/worker_grpc.go b/client/worker_grpc.go index 6c5ed70..6532e8d 100644 --- a/client/worker_grpc.go +++ b/client/worker_grpc.go @@ -6,26 +6,45 @@ import ( "io" "time" + "github.com/cenkalti/backoff/v4" "github.com/microsoft/durabletask-go/api" "github.com/microsoft/durabletask-go/backend" "github.com/microsoft/durabletask-go/internal/helpers" "github.com/microsoft/durabletask-go/internal/protos" "github.com/microsoft/durabletask-go/task" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/emptypb" "google.golang.org/protobuf/types/known/wrapperspb" ) +type workItemsStream interface { + Recv() (*protos.WorkItem, error) +} + func (c *TaskHubGrpcClient) StartWorkItemListener(ctx context.Context, r *task.TaskRegistry) error { executor := task.NewTaskExecutor(r) - if _, err := c.client.Hello(ctx, &emptypb.Empty{}); err != nil { - return fmt.Errorf("failed to connect to task hub service: %w", err) + var stream workItemsStream + + initStream := func() error { + _, err := c.client.Hello(ctx, &emptypb.Empty{}) + if err != nil { + return fmt.Errorf("failed to connect to task hub service: %w", err) + } + + req := protos.GetWorkItemsRequest{} + stream, err = c.client.GetWorkItems(ctx, &req) + if err != nil { + return fmt.Errorf("failed to get work item stream: %w", err) + } + return nil } - req := protos.GetWorkItemsRequest{} - stream, err := c.client.GetWorkItems(ctx, &req) + c.logger.Infof("connecting work item listener stream") + err := initStream() if err != nil { - return fmt.Errorf("failed to get work item stream: %w", err) + return err } go func() { @@ -33,19 +52,68 @@ func (c *TaskHubGrpcClient) StartWorkItemListener(ctx context.Context, r *task.T for { // TODO: Manage concurrency workItem, err := stream.Recv() - if err == io.EOF || stream.Context().Err() != nil { - // shutdown - break - } else if err != nil { - c.logger.Warnf("failed to establish work item stream: %v", err) - time.Sleep(5 * time.Second) + + if err != nil { + // user wants to stop the listener + if ctx.Err() != nil { + c.logger.Infof("stopping background processor: %v", err) + return + } + + retriable := false + + c.logger.Errorf("background processor received stream error: %v", err) + + if err == io.EOF { + retriable = true + } else if grpcStatus, ok := status.FromError(err); ok { + c.logger.Warnf("received grpc error code %v", grpcStatus.Code().String()) + switch grpcStatus.Code() { + case codes.Unavailable: + fallthrough + case codes.Canceled: + fallthrough + default: + retriable = true + } + } + + if !retriable { + c.logger.Infof("stopping background processor, non retriable error: %v", err) + return + } + + err = backoff.Retry( + func() error { + // user wants to stop the listener + if ctx.Err() != nil { + return backoff.Permanent(ctx.Err()) + } + + c.logger.Infof("reconnecting work item listener stream") + streamErr := initStream() + if streamErr != nil { + c.logger.Errorf("error initializing work item listener stream %v", streamErr) + return streamErr + } + return nil + }, + // retry forever since we don't have a way of asynchronously return errors to the user + newInfiniteRetries(), + ) + if err != nil { + c.logger.Infof("stopping background processor, unable to reconnect stream: %v", err) + return + } + c.logger.Infof("successfully reconnected work item listener stream...") + // continue iterating continue } if orchReq := workItem.GetOrchestratorRequest(); orchReq != nil { - go c.processOrchestrationWorkItem(stream.Context(), executor, orchReq) + go c.processOrchestrationWorkItem(ctx, executor, orchReq) } else if actReq := workItem.GetActivityRequest(); actReq != nil { - go c.processActivityWorkItem(stream.Context(), executor, actReq) + go c.processActivityWorkItem(ctx, executor, actReq) } else { c.logger.Warnf("received unknown work item type: %v", workItem) } @@ -125,3 +193,13 @@ func (c *TaskHubGrpcClient) processActivityWorkItem( } } } + +func newInfiniteRetries() *backoff.ExponentialBackOff { + b := backoff.NewExponentialBackOff() + // max wait of 15 seconds between retries + b.MaxInterval = 15 * time.Second + // retry forever + b.MaxElapsedTime = 0 + b.Reset() + return b +}