Skip to content

Commit

Permalink
Add retries to GetWorkItems stream connection (#72)
Browse files Browse the repository at this point in the history
  • Loading branch information
famarting authored Jun 21, 2024
1 parent 60e42f2 commit bfcc333
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 18 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

- Bump google.golang.org/grpc from 1.53.0 to 1.56.3 ([#39](https://github.com/microsoft/durabletask-go/pull/39))
- Updated durabletask-protobuf submodule to [`4207e1d`](https://github.com/microsoft/durabletask-protobuf/commit/4207e1dbd14cedc268f69c3befee60fcaad19367)
- Add retries to GetWorkItems stream connection ([#72](https://github.com/microsoft/durabletask-go/pull/72)) - by [@famarting](https://github.com/famarting)

## [v0.4.0] - 2023-12-18

Expand Down
2 changes: 1 addition & 1 deletion backend/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type ActivityExecutor interface {

func NewActivityTaskWorker(be Backend, executor ActivityExecutor, logger Logger, opts ...NewTaskWorkerOptions) TaskWorker {
processor := newActivityProcessor(be, executor)
return NewTaskWorker(be, processor, logger, opts...)
return NewTaskWorker(processor, logger, opts...)
}

func newActivityProcessor(be Backend, executor ActivityExecutor) TaskProcessor {
Expand Down
2 changes: 1 addition & 1 deletion backend/orchestration.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func NewOrchestrationWorker(be Backend, executor OrchestratorExecutor, logger Lo
executor: executor,
logger: logger,
}
return NewTaskWorker(be, processor, logger, opts...)
return NewTaskWorker(processor, logger, opts...)
}

// Name implements TaskProcessor
Expand Down
4 changes: 1 addition & 3 deletions backend/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ type TaskProcessor interface {
}

type worker struct {
backend Backend
options *WorkerOptions
logger Logger
// dispatchSemaphore is for throttling orchestration concurrency.
Expand Down Expand Up @@ -66,13 +65,12 @@ func WithMaxParallelism(n int32) NewTaskWorkerOptions {
}
}

func NewTaskWorker(be Backend, p TaskProcessor, logger Logger, opts ...NewTaskWorkerOptions) TaskWorker {
func NewTaskWorker(p TaskProcessor, logger Logger, opts ...NewTaskWorkerOptions) TaskWorker {
options := &WorkerOptions{MaxParallelWorkItems: 1}
for _, configure := range opts {
configure(options)
}
return &worker{
backend: be,
processor: p,
logger: logger,
dispatchSemaphore: semaphore.New(int(options.MaxParallelWorkItems)),
Expand Down
104 changes: 91 additions & 13 deletions client/worker_grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,46 +6,114 @@ import (
"io"
"time"

"github.com/cenkalti/backoff/v4"
"github.com/microsoft/durabletask-go/api"
"github.com/microsoft/durabletask-go/backend"
"github.com/microsoft/durabletask-go/internal/helpers"
"github.com/microsoft/durabletask-go/internal/protos"
"github.com/microsoft/durabletask-go/task"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/emptypb"
"google.golang.org/protobuf/types/known/wrapperspb"
)

type workItemsStream interface {
Recv() (*protos.WorkItem, error)
}

func (c *TaskHubGrpcClient) StartWorkItemListener(ctx context.Context, r *task.TaskRegistry) error {
executor := task.NewTaskExecutor(r)

if _, err := c.client.Hello(ctx, &emptypb.Empty{}); err != nil {
return fmt.Errorf("failed to connect to task hub service: %w", err)
var stream workItemsStream

initStream := func() error {
_, err := c.client.Hello(ctx, &emptypb.Empty{})
if err != nil {
return fmt.Errorf("failed to connect to task hub service: %w", err)
}

req := protos.GetWorkItemsRequest{}
stream, err = c.client.GetWorkItems(ctx, &req)
if err != nil {
return fmt.Errorf("failed to get work item stream: %w", err)
}
return nil
}

req := protos.GetWorkItemsRequest{}
stream, err := c.client.GetWorkItems(ctx, &req)
c.logger.Infof("connecting work item listener stream")
err := initStream()
if err != nil {
return fmt.Errorf("failed to get work item stream: %w", err)
return err
}

go func() {
c.logger.Info("starting background processor")
for {
// TODO: Manage concurrency
workItem, err := stream.Recv()
if err == io.EOF || stream.Context().Err() != nil {
// shutdown
break
} else if err != nil {
c.logger.Warnf("failed to establish work item stream: %v", err)
time.Sleep(5 * time.Second)

if err != nil {
// user wants to stop the listener
if ctx.Err() != nil {
c.logger.Infof("stopping background processor: %v", err)
return
}

retriable := false

c.logger.Errorf("background processor received stream error: %v", err)

if err == io.EOF {
retriable = true
} else if grpcStatus, ok := status.FromError(err); ok {
c.logger.Warnf("received grpc error code %v", grpcStatus.Code().String())
switch grpcStatus.Code() {
case codes.Unavailable:
fallthrough
case codes.Canceled:
fallthrough
default:
retriable = true
}
}

if !retriable {
c.logger.Infof("stopping background processor, non retriable error: %v", err)
return
}

err = backoff.Retry(
func() error {
// user wants to stop the listener
if ctx.Err() != nil {
return backoff.Permanent(ctx.Err())
}

c.logger.Infof("reconnecting work item listener stream")
streamErr := initStream()
if streamErr != nil {
c.logger.Errorf("error initializing work item listener stream %v", streamErr)
return streamErr
}
return nil
},
// retry forever since we don't have a way of asynchronously return errors to the user
newInfiniteRetries(),
)
if err != nil {
c.logger.Infof("stopping background processor, unable to reconnect stream: %v", err)
return
}
c.logger.Infof("successfully reconnected work item listener stream...")
// continue iterating
continue
}

if orchReq := workItem.GetOrchestratorRequest(); orchReq != nil {
go c.processOrchestrationWorkItem(stream.Context(), executor, orchReq)
go c.processOrchestrationWorkItem(ctx, executor, orchReq)
} else if actReq := workItem.GetActivityRequest(); actReq != nil {
go c.processActivityWorkItem(stream.Context(), executor, actReq)
go c.processActivityWorkItem(ctx, executor, actReq)
} else {
c.logger.Warnf("received unknown work item type: %v", workItem)
}
Expand Down Expand Up @@ -125,3 +193,13 @@ func (c *TaskHubGrpcClient) processActivityWorkItem(
}
}
}

func newInfiniteRetries() *backoff.ExponentialBackOff {
b := backoff.NewExponentialBackOff()
// max wait of 15 seconds between retries
b.MaxInterval = 15 * time.Second
// retry forever
b.MaxElapsedTime = 0
b.Reset()
return b
}

0 comments on commit bfcc333

Please sign in to comment.