Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Allow log streaming to take longer than build execution #390

Merged
merged 10 commits into from
Dec 9, 2022
12 changes: 12 additions & 0 deletions cmd/vela-worker/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package main

import (
"context"
"sync"
"time"

"github.com/go-vela/worker/executor"
Expand Down Expand Up @@ -75,6 +76,7 @@ func (w *Worker) exec(index int) error {
Driver: w.Config.Executor.Driver,
LogMethod: w.Config.Executor.LogMethod,
MaxLogSize: w.Config.Executor.MaxLogSize,
LogStreamingTimeout: w.Config.Executor.LogStreamingTimeout,
EnforceTrustedRepos: w.Config.Executor.EnforceTrustedRepos,
PrivilegedImages: w.Config.Runtime.PrivilegedImages,
Client: w.VelaClient,
Expand Down Expand Up @@ -108,7 +110,13 @@ func (w *Worker) exec(index int) error {
timeoutCtx, timeout := context.WithTimeout(buildCtx, t)
defer timeout()

// This WaitGroup delays calling DestroyBuild until the StreamBuild goroutine finishes.
var wg sync.WaitGroup

defer func() {
// if exec() exits before starting StreamBuild, this returns immediately.
wg.Wait()

logger.Info("destroying build")

// destroy the build with the executor (pass a background
Expand Down Expand Up @@ -137,8 +145,12 @@ func (w *Worker) exec(index int) error {
return nil
}

// add StreamBuild goroutine to WaitGroup
wg.Add(1)

// log/event streaming uses buildCtx so that it is not subject to the timeout.
go func() {
cognifloyd marked this conversation as resolved.
Show resolved Hide resolved
defer wg.Done()
logger.Info("streaming build logs")
// execute the build with the executor
err = _executor.StreamBuild(buildCtx)
Expand Down
1 change: 1 addition & 0 deletions cmd/vela-worker/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ func run(c *cli.Context) error {
Driver: c.String("executor.driver"),
LogMethod: c.String("executor.log_method"),
MaxLogSize: c.Uint("executor.max_log_size"),
LogStreamingTimeout: c.Duration("executor.log_streaming_timeout"),
EnforceTrustedRepos: c.Bool("executor.enforce-trusted-repos"),
},
// logger configuration
Expand Down
8 changes: 8 additions & 0 deletions executor/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
package executor

import (
"time"

"github.com/go-vela/types/constants"

"github.com/urfave/cli/v2"
Expand Down Expand Up @@ -37,6 +39,12 @@ var Flags = []cli.Flag{
Name: "executor.max_log_size",
Usage: "maximum log size (in bytes)",
},
&cli.DurationFlag{
EnvVars: []string{"WORKER_LOG_STREAMING_TIMEOUT", "VELA_LOG_STREAMING_TIMEOUT", "LOG_STREAMING_TIMEOUT"},
Name: "executor.log_streaming_timeout",
Usage: "maximum amount of time to wait for log streaming after build completes",
Value: 5 * time.Minute,
},
&cli.BoolFlag{
EnvVars: []string{"VELA_EXECUTOR_ENFORCE_TRUSTED_REPOS", "EXECUTOR_ENFORCE_TRUSTED_REPOS"},
FilePath: "/vela/executor/enforce_trusted_repos",
Expand Down
12 changes: 10 additions & 2 deletions executor/linux/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/go-vela/types/constants"
"github.com/go-vela/types/library"
"github.com/go-vela/worker/internal/build"
context2 "github.com/go-vela/worker/internal/context"
"github.com/go-vela/worker/internal/image"
"github.com/go-vela/worker/internal/step"
)
Expand Down Expand Up @@ -566,10 +567,15 @@ func (c *client) ExecBuild(ctx context.Context) error {
// StreamBuild receives a StreamRequest and then
// runs StreamService or StreamStep in a goroutine.
func (c *client) StreamBuild(ctx context.Context) error {
// cancel streaming after a timeout once the build has finished
delayedCtx, cancelStreaming := context2.
WithDelayedCancelPropagation(ctx, c.logStreamingTimeout, "streaming", c.Logger)
defer cancelStreaming()

// create an error group with the parent context
//
// https://pkg.go.dev/golang.org/x/sync/errgroup?tab=doc#WithContext
streams, streamCtx := errgroup.WithContext(ctx)
streams, streamCtx := errgroup.WithContext(delayedCtx)

defer func() {
c.Logger.Trace("waiting for stream functions to return")
Expand All @@ -579,6 +585,8 @@ func (c *client) StreamBuild(ctx context.Context) error {
c.Logger.Errorf("error in a stream request, %v", err)
}

cancelStreaming()

c.Logger.Info("all stream functions have returned")
}()

Expand Down Expand Up @@ -607,7 +615,7 @@ func (c *client) StreamBuild(ctx context.Context) error {

return nil
})
case <-ctx.Done():
case <-delayedCtx.Done():
c.Logger.Debug("streaming context canceled")
// build done or canceled
return nil
Expand Down
1 change: 1 addition & 0 deletions executor/linux/build_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1238,6 +1238,7 @@ func TestLinux_StreamBuild(t *testing.T) {
WithPipeline(_pipeline),
WithRepo(_repo),
WithRuntime(_runtime),
WithLogStreamingTimeout(1*time.Second),
WithUser(_user),
WithVelaClient(_client),
withStreamRequests(streamRequests),
Expand Down
2 changes: 2 additions & 0 deletions executor/linux/linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package linux
import (
"reflect"
"sync"
"time"

"github.com/go-vela/sdk-go/vela"
"github.com/go-vela/types/library"
Expand Down Expand Up @@ -34,6 +35,7 @@ type (
init *pipeline.Container
logMethod string
maxLogSize uint
logStreamingTimeout time.Duration
privilegedImages []string
enforceTrustedRepos bool
build *library.Build
Expand Down
13 changes: 13 additions & 0 deletions executor/linux/opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package linux

import (
"fmt"
"time"

"github.com/go-vela/sdk-go/vela"
"github.com/go-vela/types/library"
Expand Down Expand Up @@ -64,6 +65,18 @@ func WithMaxLogSize(size uint) Opt {
}
}

// WithLogStreamingTimeout sets the log streaming timeout in the executor client for Linux.
func WithLogStreamingTimeout(timeout time.Duration) Opt {
return func(c *client) error {
c.Logger.Trace("configuring log streaming timeout in linux executor client")

// set the maximum log size in the client
c.logStreamingTimeout = timeout

return nil
}
}

// WithPrivilegedImages sets the privileged images in the executor client for Linux.
func WithPrivilegedImages(images []string) Opt {
return func(c *client) error {
Expand Down
41 changes: 41 additions & 0 deletions executor/linux/opts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"net/http/httptest"
"reflect"
"testing"
"time"

"github.com/gin-gonic/gin"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -161,6 +162,46 @@ func TestLinux_Opt_WithMaxLogSize(t *testing.T) {
}
}

func TestLinux_Opt_WithLogStreamingTimeout(t *testing.T) {
// setup tests
tests := []struct {
name string
failure bool
logStreamingTimeout time.Duration
}{
{
name: "defined",
failure: false,
logStreamingTimeout: 1 * time.Second,
},
}

// run tests
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
_engine, err := New(
WithLogStreamingTimeout(test.logStreamingTimeout),
)

if test.failure {
if err == nil {
t.Errorf("WithLogStreamingTimeout should have returned err")
}

return // continue to next test
}

if err != nil {
t.Errorf("WithLogStreamingTimeout returned err: %v", err)
}

if !reflect.DeepEqual(_engine.logStreamingTimeout, test.logStreamingTimeout) {
t.Errorf("WithLogStreamingTimeout is %v, want %v", _engine.logStreamingTimeout, test.logStreamingTimeout)
}
})
}
}

func TestLinux_Opt_WithPrivilegedImages(t *testing.T) {
// setup tests
tests := []struct {
Expand Down
5 changes: 5 additions & 0 deletions executor/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package executor
import (
"fmt"
"strings"
"time"

"github.com/go-vela/sdk-go/vela"

Expand Down Expand Up @@ -40,6 +41,9 @@ type Setup struct {
LogMethod string
// specifies the maximum log size
MaxLogSize uint
// specifies how long to wait after the build finishes
// for log streaming to complete
LogStreamingTimeout time.Duration
// specifies a list of privileged images to use
PrivilegedImages []string
// configuration for enforcing that only trusted repos may run privileged images
Expand Down Expand Up @@ -85,6 +89,7 @@ func (s *Setup) Linux() (Engine, error) {
linux.WithBuild(s.Build),
linux.WithLogMethod(s.LogMethod),
linux.WithMaxLogSize(s.MaxLogSize),
linux.WithLogStreamingTimeout(s.LogStreamingTimeout),
linux.WithPrivilegedImages(s.PrivilegedImages),
linux.WithEnforceTrustedRepos(s.EnforceTrustedRepos),
linux.WithHostname(s.Hostname),
Expand Down
2 changes: 2 additions & 0 deletions executor/setup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package executor
import (
"net/http/httptest"
"testing"
"time"

"github.com/gin-gonic/gin"
"github.com/google/go-cmp/cmp"
Expand Down Expand Up @@ -79,6 +80,7 @@ func TestExecutor_Setup_Linux(t *testing.T) {
linux.WithBuild(_build),
linux.WithLogMethod("byte-chunks"),
linux.WithMaxLogSize(2097152),
linux.WithLogStreamingTimeout(1*time.Second),
linux.WithHostname("localhost"),
linux.WithPipeline(_pipeline),
linux.WithRepo(_repo),
Expand Down
49 changes: 49 additions & 0 deletions internal/context/context.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright (c) 2022 Target Brands, Inc. All rights reserved.
//
// Use of this source code is governed by the LICENSE file in this repository.

package context

import (
"context"
"time"

"github.com/sirupsen/logrus"
)

func WithDelayedCancelPropagation(parent context.Context, timeout time.Duration, name string, logger *logrus.Entry) (context.Context, context.CancelFunc) {
ctx, cancel := context.WithCancel(context.Background())

go func() {
var timer *time.Timer

// start the timer once the parent context is canceled
select {
case <-parent.Done():
logger.Tracef("parent context is done, starting %s timer for %s", name, timeout)
timer = time.NewTimer(timeout)

break
case <-ctx.Done():
logger.Tracef("%s finished before the parent context", name)

return
}

// wait for the timer to elapse or the context to naturally finish.
select {
case <-timer.C:
logger.Tracef("%s timed out, propagating cancel to %s context", name, name)
cancel()

return
cognifloyd marked this conversation as resolved.
Show resolved Hide resolved
case <-ctx.Done():
logger.Tracef("%s finished, stopping timeout timer", name)
timer.Stop()

return
cognifloyd marked this conversation as resolved.
Show resolved Hide resolved
}
}()

return ctx, cancel
}
Loading