Skip to content

Commit

Permalink
enhance: Add WithDelayedCancelPropagation to implement logStreamingTi…
Browse files Browse the repository at this point in the history
…meout
  • Loading branch information
cognifloyd committed Oct 11, 2022
1 parent bb9d324 commit d6f801c
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 2 deletions.
11 changes: 9 additions & 2 deletions executor/linux/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/go-vela/types/constants"
"github.com/go-vela/worker/internal/build"
context2 "github.com/go-vela/worker/internal/context"
"github.com/go-vela/worker/internal/step"
)

Expand Down Expand Up @@ -496,10 +497,14 @@ func (c *client) ExecBuild(ctx context.Context) error {
// StreamBuild receives a StreamRequest and then
// runs StreamService or StreamStep in a goroutine.
func (c *client) StreamBuild(ctx context.Context) error {
// cancel streaming after a timeout once the build has finished
delayedCtx, cancelStreaming := context2.WithDelayedCancelPropagation(ctx, c.logStreamingTimeout)
defer cancelStreaming()

// create an error group with the parent context
//
// https://pkg.go.dev/golang.org/x/sync/errgroup?tab=doc#WithContext
streams, streamCtx := errgroup.WithContext(ctx)
streams, streamCtx := errgroup.WithContext(delayedCtx)

defer func() {
c.Logger.Trace("waiting for stream functions to return")
Expand All @@ -509,6 +514,8 @@ func (c *client) StreamBuild(ctx context.Context) error {
c.Logger.Errorf("error in a stream request, %v", err)
}

cancelStreaming()

c.Logger.Info("all stream functions have returned")
}()

Expand Down Expand Up @@ -537,7 +544,7 @@ func (c *client) StreamBuild(ctx context.Context) error {

return nil
})
case <-ctx.Done():
case <-delayedCtx.Done():
c.Logger.Debug("streaming context canceled")
// build done or canceled
return nil
Expand Down
1 change: 1 addition & 0 deletions executor/linux/build_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -715,6 +715,7 @@ func TestLinux_StreamBuild(t *testing.T) {
WithPipeline(_pipeline),
WithRepo(_repo),
WithRuntime(_runtime),
WithLogStreamingTimeout(1*time.Second),
WithUser(_user),
WithVelaClient(_client),
withStreamRequests(streamRequests),
Expand Down
38 changes: 38 additions & 0 deletions internal/context/context.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright (c) 2022 Target Brands, Inc. All rights reserved.
//
// Use of this source code is governed by the LICENSE file in this repository.

package context

import (
"context"
"time"
)

func WithDelayedCancelPropagation(parent context.Context, timeout time.Duration) (context.Context, context.CancelFunc) {
ctx, cancel := context.WithCancel(context.Background())

go func() {
var timer *time.Timer

// start the timer once the parent context is canceled
select {
case <-parent.Done():
timer = time.NewTimer(timeout)
case <-ctx.Done():
return
}

// wait for the timer to elapse or the context to naturally finish.
select {
case <-timer.C:
cancel()
return
case <-ctx.Done():
timer.Stop()
return
}
}()

return ctx, cancel
}
10 changes: 10 additions & 0 deletions internal/context/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
// Copyright (c) 2022 Target Brands, Inc. All rights reserved.
//
// Use of this source code is governed by the LICENSE file in this repository.

// Package context provides context utilities.
//
// Usage:
//
// import "github.com/go-vela/worker/internal/context"
package context

0 comments on commit d6f801c

Please sign in to comment.