Skip to content

Commit

Permalink
Use runContainerDone channel instead of runCtx context
Browse files Browse the repository at this point in the history
The purpose is a bit clearer this way
  • Loading branch information
cognifloyd committed Feb 26, 2022
1 parent bfd9c4f commit 9973e77
Show file tree
Hide file tree
Showing 15 changed files with 52 additions and 53 deletions.
4 changes: 2 additions & 2 deletions executor/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ type Engine interface {
ExecService(context.Context, *pipeline.Container) error
// StreamService defines a function that
// tails the output for a service.
StreamService(context.Context, context.Context, *pipeline.Container) error
StreamService(context.Context, chan struct{}, *pipeline.Container) error
// DestroyService defines a function that
// cleans up the service after execution.
DestroyService(context.Context, *pipeline.Container) error
Expand Down Expand Up @@ -103,7 +103,7 @@ type Engine interface {
ExecStep(context.Context, *pipeline.Container) error
// StreamStep defines a function that
// tails the output for a step.
StreamStep(context.Context, context.Context, *pipeline.Container) error
StreamStep(context.Context, chan struct{}, *pipeline.Container) error
// DestroyStep defines a function that
// cleans up the step after execution.
DestroyStep(context.Context, *pipeline.Container) error
Expand Down
10 changes: 5 additions & 5 deletions executor/linux/secret.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,12 +131,12 @@ func (s *secretSvc) exec(ctx context.Context, p *pipeline.SecretSlice) error {
// Docker runtime needs to wait to tail logs until after RunContainer.
// Kubernetes runtime needs to start tailing logs before RunContainer.
// runContainerDone will let Runtime.TailContainer know when RunContainer has finished.
runCtx, runContainerDone := context.WithCancel(context.Background())
runContainerDone := make(chan struct{})

go func() {
logger.Debug("stream logs for container")
// stream logs from container
err = s.client.secret.stream(ctx, runCtx, _secret.Origin)
err = s.client.secret.stream(ctx, runContainerDone, _secret.Origin)
if err != nil {
logger.Error(err)
}
Expand All @@ -146,7 +146,7 @@ func (s *secretSvc) exec(ctx context.Context, p *pipeline.SecretSlice) error {
// run the runtime container
err := s.client.Runtime.RunContainer(ctx, _secret.Origin, s.client.pipeline)
// Tell Runtime.TailContainer that RunContainer is done.
runContainerDone()
close(runContainerDone)
if err != nil {
return err
}
Expand Down Expand Up @@ -257,7 +257,7 @@ func (s *secretSvc) pull(secret *pipeline.Secret) (*library.Secret, error) {
}

// stream tails the output for a secret plugin.
func (s *secretSvc) stream(ctx context.Context, runCtx context.Context, ctn *pipeline.Container) error {
func (s *secretSvc) stream(ctx context.Context, runContainerDone chan struct{}, ctn *pipeline.Container) error {
// stream all the logs to the init step
_log, err := step.LoadLogs(s.client.init, &s.client.stepLogs)
if err != nil {
Expand Down Expand Up @@ -295,7 +295,7 @@ func (s *secretSvc) stream(ctx context.Context, runCtx context.Context, ctn *pip

logger.Debug("tailing container")
// tail the runtime container
rc, err := s.client.Runtime.TailContainer(ctx, runCtx, ctn)
rc, err := s.client.Runtime.TailContainer(ctx, runContainerDone, ctn)
if err != nil {
return err
}
Expand Down
6 changes: 3 additions & 3 deletions executor/linux/secret_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -522,8 +522,8 @@ func TestLinux_Secret_stream(t *testing.T) {
},
},
}
runCtx, runContainerDone := context.WithCancel(context.Background())
runContainerDone()
runContainerDone := make(chan struct{})
close(runContainerDone)

// run tests
for _, test := range tests {
Expand All @@ -542,7 +542,7 @@ func TestLinux_Secret_stream(t *testing.T) {
// add init container info to client
_ = _engine.CreateBuild(context.Background())

err = _engine.secret.stream(context.Background(), runCtx, test.container)
err = _engine.secret.stream(context.Background(), runContainerDone, test.container)

if test.failure {
if err == nil {
Expand Down
12 changes: 6 additions & 6 deletions executor/linux/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func (c *client) ExecService(ctx context.Context, ctn *pipeline.Container) error
// Docker runtime needs to wait to tail logs until after RunContainer.
// Kubernetes runtime needs to start tailing logs before RunContainer.
// runContainerDone will let Runtime.TailContainer know when RunContainer has finished.
runCtx, runContainerDone := context.WithCancel(context.Background())
runContainerDone := make(chan struct{})

// create an error group with the parent context
//
Expand All @@ -149,7 +149,7 @@ func (c *client) ExecService(ctx context.Context, ctn *pipeline.Container) error
logs.Go(func() error {
logger.Debug("streaming logs for container")
// stream logs from container
err := c.StreamService(logCtx, runCtx, ctn)
err := c.StreamService(logCtx, runContainerDone, ctn)
if err != nil {
logger.Error(err)
}
Expand All @@ -161,7 +161,7 @@ func (c *client) ExecService(ctx context.Context, ctn *pipeline.Container) error
// run the runtime container
err = c.Runtime.RunContainer(ctx, ctn, c.pipeline)
// Tell Runtime.TailContainer that RunContainer is done.
runContainerDone()
close(runContainerDone)
if err != nil {
return err
}
Expand All @@ -172,7 +172,7 @@ func (c *client) ExecService(ctx context.Context, ctn *pipeline.Container) error
// StreamService tails the output for a service.
//
// nolint: funlen // ignore function length
func (c *client) StreamService(ctx context.Context, runCtx context.Context, ctn *pipeline.Container) error {
func (c *client) StreamService(ctx context.Context, runContainerDone chan struct{}, ctn *pipeline.Container) error {
// update engine logger with service metadata
//
// https://pkg.go.dev/github.com/sirupsen/logrus?tab=doc#Entry.WithField
Expand All @@ -188,7 +188,7 @@ func (c *client) StreamService(ctx context.Context, runCtx context.Context, ctn

defer func() {
// tail the runtime container
rc, err := c.Runtime.TailContainer(ctx, runCtx, ctn)
rc, err := c.Runtime.TailContainer(ctx, runContainerDone, ctn)
if err != nil {
logger.Errorf("unable to tail container output for upload: %v", err)

Expand Down Expand Up @@ -228,7 +228,7 @@ func (c *client) StreamService(ctx context.Context, runCtx context.Context, ctn

logger.Debug("tailing container")
// tail the runtime container
rc, err := c.Runtime.TailContainer(ctx, runCtx, ctn)
rc, err := c.Runtime.TailContainer(ctx, runContainerDone, ctn)
if err != nil {
return err
}
Expand Down
6 changes: 3 additions & 3 deletions executor/linux/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,8 +351,8 @@ func TestLinux_StreamService(t *testing.T) {
container: new(pipeline.Container),
},
}
runCtx, runContainerDone := context.WithCancel(context.Background())
runContainerDone()
runContainerDone := make(chan struct{})
close(runContainerDone)

// run tests
for _, test := range tests {
Expand All @@ -373,7 +373,7 @@ func TestLinux_StreamService(t *testing.T) {
_engine.serviceLogs.Store(test.container.ID, new(library.Log))
}

err = _engine.StreamService(context.Background(), runCtx, test.container)
err = _engine.StreamService(context.Background(), runContainerDone, test.container)

if test.failure {
if err == nil {
Expand Down
12 changes: 6 additions & 6 deletions executor/linux/step.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func (c *client) ExecStep(ctx context.Context, ctn *pipeline.Container) error {
// Docker runtime needs to wait to tail logs until after RunContainer.
// Kubernetes runtime needs to start tailing logs before RunContainer.
// runContainerDone will let Runtime.TailContainer know when RunContainer has finished.
runCtx, runContainerDone := context.WithCancel(context.Background())
runContainerDone := make(chan struct{})

// create an error group with the parent context
//
Expand All @@ -161,7 +161,7 @@ func (c *client) ExecStep(ctx context.Context, ctn *pipeline.Container) error {
logs.Go(func() error {
logger.Debug("streaming logs for container")
// stream logs from container
err := c.StreamStep(logCtx, runCtx, ctn)
err := c.StreamStep(logCtx, runContainerDone, ctn)
if err != nil {
logger.Error(err)
}
Expand All @@ -173,7 +173,7 @@ func (c *client) ExecStep(ctx context.Context, ctn *pipeline.Container) error {
// run the runtime container
err = c.Runtime.RunContainer(ctx, ctn, c.pipeline)
// Tell Runtime.TailContainer that RunContainer is done.
runContainerDone()
close(runContainerDone)
if err != nil {
return err
}
Expand Down Expand Up @@ -203,7 +203,7 @@ func (c *client) ExecStep(ctx context.Context, ctn *pipeline.Container) error {
// StreamStep tails the output for a step.
//
// nolint: funlen // ignore function length
func (c *client) StreamStep(ctx context.Context, runCtx context.Context, ctn *pipeline.Container) error {
func (c *client) StreamStep(ctx context.Context, runContainerDone chan struct{}, ctn *pipeline.Container) error {
// TODO: remove hardcoded reference
if ctn.Name == "init" {
return nil
Expand All @@ -226,7 +226,7 @@ func (c *client) StreamStep(ctx context.Context, runCtx context.Context, ctn *pi

defer func() {
// tail the runtime container
rc, err := c.Runtime.TailContainer(ctx, runCtx, ctn)
rc, err := c.Runtime.TailContainer(ctx, runContainerDone, ctn)
if err != nil {
logger.Errorf("unable to tail container output for upload: %v", err)

Expand Down Expand Up @@ -271,7 +271,7 @@ func (c *client) StreamStep(ctx context.Context, runCtx context.Context, ctn *pi

logger.Debug("tailing container")
// tail the runtime container
rc, err := c.Runtime.TailContainer(ctx, runCtx, ctn)
rc, err := c.Runtime.TailContainer(ctx, runContainerDone, ctn)
if err != nil {
return err
}
Expand Down
6 changes: 3 additions & 3 deletions executor/linux/step_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,8 +396,8 @@ func TestLinux_StreamStep(t *testing.T) {
container: new(pipeline.Container),
},
}
runCtx, runContainerDone := context.WithCancel(context.Background())
runContainerDone()
runContainerDone := make(chan struct{})
close(runContainerDone)

// run tests
for _, test := range tests {
Expand All @@ -419,7 +419,7 @@ func TestLinux_StreamStep(t *testing.T) {
_engine.stepLogs.Store(test.container.ID, new(library.Log))
}

err = _engine.StreamStep(context.Background(), runCtx, test.container)
err = _engine.StreamStep(context.Background(), runContainerDone, test.container)

if test.failure {
if err == nil {
Expand Down
10 changes: 5 additions & 5 deletions executor/local/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,11 @@ func (c *client) ExecService(ctx context.Context, ctn *pipeline.Container) error
// Docker runtime needs to wait to tail logs until after RunContainer.
// Kubernetes runtime needs to start tailing logs before RunContainer.
// runContainerDone will let Runtime.TailContainer know when RunContainer has finished.
runCtx, runContainerDone := context.WithCancel(context.Background())
runContainerDone := make(chan struct{})

go func() {
// stream logs from container
err := c.StreamService(context.Background(), runCtx, ctn)
err := c.StreamService(context.Background(), runContainerDone, ctn)
if err != nil {
fmt.Fprintln(os.Stdout, "unable to stream logs for service:", err)
}
Expand All @@ -106,7 +106,7 @@ func (c *client) ExecService(ctx context.Context, ctn *pipeline.Container) error
// run the runtime container
err = c.Runtime.RunContainer(ctx, ctn, c.pipeline)
// Tell Runtime.TailContainer that RunContainer is done.
runContainerDone()
close(runContainerDone)
if err != nil {
return err
}
Expand All @@ -115,9 +115,9 @@ func (c *client) ExecService(ctx context.Context, ctn *pipeline.Container) error
}

// StreamService tails the output for a service.
func (c *client) StreamService(ctx context.Context, runCtx context.Context, ctn *pipeline.Container) error {
func (c *client) StreamService(ctx context.Context, runContainerDone chan struct{}, ctn *pipeline.Container) error {
// tail the runtime container
rc, err := c.Runtime.TailContainer(ctx, runCtx, ctn)
rc, err := c.Runtime.TailContainer(ctx, runContainerDone, ctn)
if err != nil {
return err
}
Expand Down
6 changes: 3 additions & 3 deletions executor/local/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,8 +276,8 @@ func TestLocal_StreamService(t *testing.T) {
container: new(pipeline.Container),
},
}
runCtx, runContainerDone := context.WithCancel(context.Background())
runContainerDone()
runContainerDone := make(chan struct{})
close(runContainerDone)

// run tests
for _, test := range tests {
Expand All @@ -292,7 +292,7 @@ func TestLocal_StreamService(t *testing.T) {
t.Errorf("unable to create executor engine: %v", err)
}

err = _engine.StreamService(context.Background(), runCtx, test.container)
err = _engine.StreamService(context.Background(), runContainerDone, test.container)

if test.failure {
if err == nil {
Expand Down
10 changes: 5 additions & 5 deletions executor/local/step.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,11 @@ func (c *client) ExecStep(ctx context.Context, ctn *pipeline.Container) error {
// Docker runtime needs to wait to tail logs until after RunContainer.
// Kubernetes runtime needs to start tailing logs before RunContainer.
// runContainerDone will let Runtime.TailContainer know when RunContainer has finished.
runCtx, runContainerDone := context.WithCancel(context.Background())
runContainerDone := make(chan struct{})

go func() {
// stream logs from container
err := c.StreamStep(context.Background(), runCtx, ctn)
err := c.StreamStep(context.Background(), runContainerDone, ctn)
if err != nil {
// TODO: Should this be changed or removed?
fmt.Println(err)
Expand All @@ -114,7 +114,7 @@ func (c *client) ExecStep(ctx context.Context, ctn *pipeline.Container) error {
// run the runtime container
err = c.Runtime.RunContainer(ctx, ctn, c.pipeline)
// Tell Runtime.TailContainer that RunContainer is done.
runContainerDone()
close(runContainerDone)
if err != nil {
return err
}
Expand All @@ -140,14 +140,14 @@ func (c *client) ExecStep(ctx context.Context, ctn *pipeline.Container) error {
}

// StreamStep tails the output for a step.
func (c *client) StreamStep(ctx context.Context, runCtx context.Context, ctn *pipeline.Container) error {
func (c *client) StreamStep(ctx context.Context, runContainerDone chan struct{}, ctn *pipeline.Container) error {
// TODO: remove hardcoded reference
if ctn.Name == "init" {
return nil
}

// tail the runtime container
rc, err := c.Runtime.TailContainer(ctx, runCtx, ctn)
rc, err := c.Runtime.TailContainer(ctx, runContainerDone, ctn)
if err != nil {
return err
}
Expand Down
6 changes: 3 additions & 3 deletions executor/local/step_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,8 +325,8 @@ func TestLocal_StreamStep(t *testing.T) {
container: new(pipeline.Container),
},
}
runCtx, runContainerDone := context.WithCancel(context.Background())
runContainerDone()
runContainerDone := make(chan struct{})
close(runContainerDone)

// run tests
for _, test := range tests {
Expand All @@ -341,7 +341,7 @@ func TestLocal_StreamStep(t *testing.T) {
t.Errorf("unable to create executor engine: %v", err)
}

err = _engine.StreamStep(context.Background(), runCtx, test.container)
err = _engine.StreamStep(context.Background(), runContainerDone, test.container)

if test.failure {
if err == nil {
Expand Down
7 changes: 3 additions & 4 deletions runtime/docker/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,14 +237,13 @@ func (c *client) SetupContainer(ctx context.Context, ctn *pipeline.Container) er
// TailContainer captures the logs for the pipeline container.
//
// nolint: lll // ignore long line length due to variable names
func (c *client) TailContainer(ctx context.Context, runCtx context.Context, ctn *pipeline.Container) (io.ReadCloser, error) {
func (c *client) TailContainer(ctx context.Context, runContainerDone chan struct{}, ctn *pipeline.Container) (io.ReadCloser, error) {
c.Logger.Tracef("tailing output for container %s", ctn.ID)

// Tailing starts before RunContainer. For Docker, we wait until RunContainer has finished.
select {
case <-runCtx.Done():
break
for range runContainerDone {
}

// create options for capturing container logs
//
// https://godoc.org/github.com/docker/docker/api/types#ContainerLogsOptions
Expand Down
6 changes: 3 additions & 3 deletions runtime/docker/container_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,12 +329,12 @@ func TestDocker_TailContainer(t *testing.T) {
container: new(pipeline.Container),
},
}
runCtx, runContainerDone := context.WithCancel(context.Background())
runContainerDone()
runContainerDone := make(chan struct{})
close(runContainerDone)

// run tests
for _, test := range tests {
_, err = _engine.TailContainer(context.Background(), runCtx, test.container)
_, err = _engine.TailContainer(context.Background(), runContainerDone, test.container)

if test.failure {
if err == nil {
Expand Down
2 changes: 1 addition & 1 deletion runtime/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ type Engine interface {
SetupContainer(context.Context, *pipeline.Container) error
// TailContainer defines a function that captures
// the logs on the pipeline container.
TailContainer(context.Context, context.Context, *pipeline.Container) (io.ReadCloser, error)
TailContainer(context.Context, chan struct{}, *pipeline.Container) (io.ReadCloser, error)
// WaitContainer defines a function that blocks
// until the pipeline container completes.
WaitContainer(context.Context, *pipeline.Container) error
Expand Down
2 changes: 1 addition & 1 deletion runtime/kubernetes/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ func (c *client) setupContainerEnvironment(ctn *pipeline.Container) error {
// TailContainer captures the logs for the pipeline container.
//
// nolint: lll // ignore long line length due to variable names
func (c *client) TailContainer(ctx context.Context, runCtx context.Context, ctn *pipeline.Container) (io.ReadCloser, error) {
func (c *client) TailContainer(ctx context.Context, runCtx chan struct{}, ctn *pipeline.Container) (io.ReadCloser, error) {
c.Logger.Tracef("tailing output for container %s", ctn.ID)

// create object to store container logs
Expand Down

0 comments on commit 9973e77

Please sign in to comment.