diff --git a/executor/linux/build.go b/executor/linux/build.go index 8f01fe5d..e465a7ec 100644 --- a/executor/linux/build.go +++ b/executor/linux/build.go @@ -15,6 +15,7 @@ import ( "github.com/go-vela/types/constants" "github.com/go-vela/worker/internal/build" + context2 "github.com/go-vela/worker/internal/context" "github.com/go-vela/worker/internal/step" ) @@ -496,10 +497,14 @@ func (c *client) ExecBuild(ctx context.Context) error { // StreamBuild receives a StreamRequest and then // runs StreamService or StreamStep in a goroutine. func (c *client) StreamBuild(ctx context.Context) error { + // cancel streaming after a timeout once the build has finished + delayedCtx, cancelStreaming := context2.WithDelayedCancelPropagation(ctx, c.logStreamingTimeout) + defer cancelStreaming() + // create an error group with the parent context // // https://pkg.go.dev/golang.org/x/sync/errgroup?tab=doc#WithContext - streams, streamCtx := errgroup.WithContext(ctx) + streams, streamCtx := errgroup.WithContext(delayedCtx) defer func() { c.Logger.Trace("waiting for stream functions to return") @@ -509,6 +514,8 @@ func (c *client) StreamBuild(ctx context.Context) error { c.Logger.Errorf("error in a stream request, %v", err) } + cancelStreaming() + c.Logger.Info("all stream functions have returned") }() @@ -537,7 +544,7 @@ func (c *client) StreamBuild(ctx context.Context) error { return nil }) - case <-ctx.Done(): + case <-delayedCtx.Done(): c.Logger.Debug("streaming context canceled") // build done or canceled return nil diff --git a/executor/linux/build_test.go b/executor/linux/build_test.go index 5422a951..35584963 100644 --- a/executor/linux/build_test.go +++ b/executor/linux/build_test.go @@ -715,6 +715,7 @@ func TestLinux_StreamBuild(t *testing.T) { WithPipeline(_pipeline), WithRepo(_repo), WithRuntime(_runtime), + WithLogStreamingTimeout(1*time.Second), WithUser(_user), WithVelaClient(_client), withStreamRequests(streamRequests), diff --git a/internal/context/context.go b/internal/context/context.go new file mode 100644 index 00000000..7c85f9b9 --- /dev/null +++ b/internal/context/context.go @@ -0,0 +1,38 @@ +// Copyright (c) 2022 Target Brands, Inc. All rights reserved. +// +// Use of this source code is governed by the LICENSE file in this repository. + +package context + +import ( + "context" + "time" +) + +func WithDelayedCancelPropagation(parent context.Context, timeout time.Duration) (context.Context, context.CancelFunc) { + ctx, cancel := context.WithCancel(context.Background()) + + go func() { + var timer *time.Timer + + // start the timer once the parent context is canceled + select { + case <-parent.Done(): + timer = time.NewTimer(timeout) + case <-ctx.Done(): + return + } + + // wait for the timer to elapse or the context to naturally finish. + select { + case <-timer.C: + cancel() + return + case <-ctx.Done(): + timer.Stop() + return + } + }() + + return ctx, cancel +} diff --git a/internal/context/doc.go b/internal/context/doc.go new file mode 100644 index 00000000..c7c2f795 --- /dev/null +++ b/internal/context/doc.go @@ -0,0 +1,10 @@ +// Copyright (c) 2022 Target Brands, Inc. All rights reserved. +// +// Use of this source code is governed by the LICENSE file in this repository. + +// Package context provides context utilities. +// +// Usage: +// +// import "github.com/go-vela/worker/internal/context" +package context