Skip to content

Commit

Permalink
fix: Allow log streaming to take longer than build execution (#390)
Browse files Browse the repository at this point in the history
  • Loading branch information
cognifloyd committed Dec 9, 2022
1 parent 27fb3a9 commit b50e19f
Show file tree
Hide file tree
Showing 13 changed files with 310 additions and 2 deletions.
12 changes: 12 additions & 0 deletions cmd/vela-worker/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package main

import (
"context"
"sync"
"time"

"github.com/go-vela/worker/executor"
Expand Down Expand Up @@ -75,6 +76,7 @@ func (w *Worker) exec(index int) error {
Driver: w.Config.Executor.Driver,
LogMethod: w.Config.Executor.LogMethod,
MaxLogSize: w.Config.Executor.MaxLogSize,
LogStreamingTimeout: w.Config.Executor.LogStreamingTimeout,
EnforceTrustedRepos: w.Config.Executor.EnforceTrustedRepos,
PrivilegedImages: w.Config.Runtime.PrivilegedImages,
Client: w.VelaClient,
Expand Down Expand Up @@ -108,7 +110,13 @@ func (w *Worker) exec(index int) error {
timeoutCtx, timeout := context.WithTimeout(buildCtx, t)
defer timeout()

// This WaitGroup delays calling DestroyBuild until the StreamBuild goroutine finishes.
var wg sync.WaitGroup

defer func() {
// if exec() exits before starting StreamBuild, this returns immediately.
wg.Wait()

logger.Info("destroying build")

// destroy the build with the executor (pass a background
Expand Down Expand Up @@ -137,8 +145,12 @@ func (w *Worker) exec(index int) error {
return nil
}

// add StreamBuild goroutine to WaitGroup
wg.Add(1)

// log/event streaming uses buildCtx so that it is not subject to the timeout.
go func() {
defer wg.Done()
logger.Info("streaming build logs")
// execute the build with the executor
err = _executor.StreamBuild(buildCtx)
Expand Down
1 change: 1 addition & 0 deletions cmd/vela-worker/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ func run(c *cli.Context) error {
Driver: c.String("executor.driver"),
LogMethod: c.String("executor.log_method"),
MaxLogSize: c.Uint("executor.max_log_size"),
LogStreamingTimeout: c.Duration("executor.log_streaming_timeout"),
EnforceTrustedRepos: c.Bool("executor.enforce-trusted-repos"),
},
// logger configuration
Expand Down
8 changes: 8 additions & 0 deletions executor/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
package executor

import (
"time"

"github.com/go-vela/types/constants"

"github.com/urfave/cli/v2"
Expand Down Expand Up @@ -37,6 +39,12 @@ var Flags = []cli.Flag{
Name: "executor.max_log_size",
Usage: "maximum log size (in bytes)",
},
&cli.DurationFlag{
EnvVars: []string{"WORKER_LOG_STREAMING_TIMEOUT", "VELA_LOG_STREAMING_TIMEOUT", "LOG_STREAMING_TIMEOUT"},
Name: "executor.log_streaming_timeout",
Usage: "maximum amount of time to wait for log streaming after build completes",
Value: 5 * time.Minute,
},
&cli.BoolFlag{
EnvVars: []string{"VELA_EXECUTOR_ENFORCE_TRUSTED_REPOS", "EXECUTOR_ENFORCE_TRUSTED_REPOS"},
FilePath: "/vela/executor/enforce_trusted_repos",
Expand Down
12 changes: 10 additions & 2 deletions executor/linux/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/go-vela/types/constants"
"github.com/go-vela/types/library"
"github.com/go-vela/worker/internal/build"
context2 "github.com/go-vela/worker/internal/context"
"github.com/go-vela/worker/internal/image"
"github.com/go-vela/worker/internal/step"
)
Expand Down Expand Up @@ -566,10 +567,15 @@ func (c *client) ExecBuild(ctx context.Context) error {
// StreamBuild receives a StreamRequest and then
// runs StreamService or StreamStep in a goroutine.
func (c *client) StreamBuild(ctx context.Context) error {
// cancel streaming after a timeout once the build has finished
delayedCtx, cancelStreaming := context2.
WithDelayedCancelPropagation(ctx, c.logStreamingTimeout, "streaming", c.Logger)
defer cancelStreaming()

// create an error group with the parent context
//
// https://pkg.go.dev/golang.org/x/sync/errgroup?tab=doc#WithContext
streams, streamCtx := errgroup.WithContext(ctx)
streams, streamCtx := errgroup.WithContext(delayedCtx)

defer func() {
c.Logger.Trace("waiting for stream functions to return")
Expand All @@ -579,6 +585,8 @@ func (c *client) StreamBuild(ctx context.Context) error {
c.Logger.Errorf("error in a stream request, %v", err)
}

cancelStreaming()

c.Logger.Info("all stream functions have returned")
}()

Expand Down Expand Up @@ -607,7 +615,7 @@ func (c *client) StreamBuild(ctx context.Context) error {

return nil
})
case <-ctx.Done():
case <-delayedCtx.Done():
c.Logger.Debug("streaming context canceled")
// build done or canceled
return nil
Expand Down
1 change: 1 addition & 0 deletions executor/linux/build_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1238,6 +1238,7 @@ func TestLinux_StreamBuild(t *testing.T) {
WithPipeline(_pipeline),
WithRepo(_repo),
WithRuntime(_runtime),
WithLogStreamingTimeout(1*time.Second),
WithUser(_user),
WithVelaClient(_client),
withStreamRequests(streamRequests),
Expand Down
2 changes: 2 additions & 0 deletions executor/linux/linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package linux
import (
"reflect"
"sync"
"time"

"github.com/go-vela/sdk-go/vela"
"github.com/go-vela/types/library"
Expand Down Expand Up @@ -34,6 +35,7 @@ type (
init *pipeline.Container
logMethod string
maxLogSize uint
logStreamingTimeout time.Duration
privilegedImages []string
enforceTrustedRepos bool
build *library.Build
Expand Down
13 changes: 13 additions & 0 deletions executor/linux/opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package linux

import (
"fmt"
"time"

"github.com/go-vela/sdk-go/vela"
"github.com/go-vela/types/library"
Expand Down Expand Up @@ -64,6 +65,18 @@ func WithMaxLogSize(size uint) Opt {
}
}

// WithLogStreamingTimeout sets the log streaming timeout in the executor client for Linux.
func WithLogStreamingTimeout(timeout time.Duration) Opt {
return func(c *client) error {
c.Logger.Trace("configuring log streaming timeout in linux executor client")

// set the maximum log size in the client
c.logStreamingTimeout = timeout

return nil
}
}

// WithPrivilegedImages sets the privileged images in the executor client for Linux.
func WithPrivilegedImages(images []string) Opt {
return func(c *client) error {
Expand Down
41 changes: 41 additions & 0 deletions executor/linux/opts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"net/http/httptest"
"reflect"
"testing"
"time"

"github.com/gin-gonic/gin"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -161,6 +162,46 @@ func TestLinux_Opt_WithMaxLogSize(t *testing.T) {
}
}

func TestLinux_Opt_WithLogStreamingTimeout(t *testing.T) {
// setup tests
tests := []struct {
name string
failure bool
logStreamingTimeout time.Duration
}{
{
name: "defined",
failure: false,
logStreamingTimeout: 1 * time.Second,
},
}

// run tests
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
_engine, err := New(
WithLogStreamingTimeout(test.logStreamingTimeout),
)

if test.failure {
if err == nil {
t.Errorf("WithLogStreamingTimeout should have returned err")
}

return // continue to next test
}

if err != nil {
t.Errorf("WithLogStreamingTimeout returned err: %v", err)
}

if !reflect.DeepEqual(_engine.logStreamingTimeout, test.logStreamingTimeout) {
t.Errorf("WithLogStreamingTimeout is %v, want %v", _engine.logStreamingTimeout, test.logStreamingTimeout)
}
})
}
}

func TestLinux_Opt_WithPrivilegedImages(t *testing.T) {
// setup tests
tests := []struct {
Expand Down
5 changes: 5 additions & 0 deletions executor/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package executor
import (
"fmt"
"strings"
"time"

"github.com/go-vela/sdk-go/vela"

Expand Down Expand Up @@ -40,6 +41,9 @@ type Setup struct {
LogMethod string
// specifies the maximum log size
MaxLogSize uint
// specifies how long to wait after the build finishes
// for log streaming to complete
LogStreamingTimeout time.Duration
// specifies a list of privileged images to use
PrivilegedImages []string
// configuration for enforcing that only trusted repos may run privileged images
Expand Down Expand Up @@ -85,6 +89,7 @@ func (s *Setup) Linux() (Engine, error) {
linux.WithBuild(s.Build),
linux.WithLogMethod(s.LogMethod),
linux.WithMaxLogSize(s.MaxLogSize),
linux.WithLogStreamingTimeout(s.LogStreamingTimeout),
linux.WithPrivilegedImages(s.PrivilegedImages),
linux.WithEnforceTrustedRepos(s.EnforceTrustedRepos),
linux.WithHostname(s.Hostname),
Expand Down
2 changes: 2 additions & 0 deletions executor/setup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package executor
import (
"net/http/httptest"
"testing"
"time"

"github.com/gin-gonic/gin"
"github.com/google/go-cmp/cmp"
Expand Down Expand Up @@ -79,6 +80,7 @@ func TestExecutor_Setup_Linux(t *testing.T) {
linux.WithBuild(_build),
linux.WithLogMethod("byte-chunks"),
linux.WithMaxLogSize(2097152),
linux.WithLogStreamingTimeout(1*time.Second),
linux.WithHostname("localhost"),
linux.WithPipeline(_pipeline),
linux.WithRepo(_repo),
Expand Down
49 changes: 49 additions & 0 deletions internal/context/context.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright (c) 2022 Target Brands, Inc. All rights reserved.
//
// Use of this source code is governed by the LICENSE file in this repository.

package context

import (
"context"
"time"

"github.com/sirupsen/logrus"
)

func WithDelayedCancelPropagation(parent context.Context, timeout time.Duration, name string, logger *logrus.Entry) (context.Context, context.CancelFunc) {
ctx, cancel := context.WithCancel(context.Background())

go func() {
var timer *time.Timer

// start the timer once the parent context is canceled
select {
case <-parent.Done():
logger.Tracef("parent context is done, starting %s timer for %s", name, timeout)
timer = time.NewTimer(timeout)

break
case <-ctx.Done():
logger.Tracef("%s finished before the parent context", name)

return
}

// wait for the timer to elapse or the context to naturally finish.
select {
case <-timer.C:
logger.Tracef("%s timed out, propagating cancel to %s context", name, name)
cancel()

return
case <-ctx.Done():
logger.Tracef("%s finished, stopping timeout timer", name)
timer.Stop()

return
}
}()

return ctx, cancel
}
Loading

0 comments on commit b50e19f

Please sign in to comment.