Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix race condition around supervisor's Commander #291

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 28 additions & 9 deletions internal/examples/supervisor/supervisor/commander/commander.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"os"
"os/exec"
"sync"
"sync/atomic"
"syscall"
"time"
Expand All @@ -23,7 +24,11 @@ type Commander struct {
cmd *exec.Cmd
doneCh chan struct{}
waitCh chan struct{}
running int64
running atomic.Bool

// True when stopping is in progress.
isStoppingFlag bool
isStoppingMutex sync.RWMutex
}

func NewCommander(logger types.Logger, cfg *config.Agent, args ...string) (*Commander, error) {
Expand All @@ -41,6 +46,10 @@ func NewCommander(logger types.Logger, cfg *config.Agent, args ...string) (*Comm
// Start the Agent and begin watching the process.
// Agent's stdout and stderr are written to a file.
func (c *Commander) Start(ctx context.Context) error {
if c.IsStopping() {
return nil
}

c.logger.Debugf(ctx, "Starting agent %s", c.cfg.Executable)

logFilePath := "agent.log"
Expand All @@ -63,7 +72,7 @@ func (c *Commander) Start(ctx context.Context) error {
}

c.logger.Debugf(ctx, "Agent process started, PID=%d", c.cmd.Process.Pid)
atomic.StoreInt64(&c.running, 1)
c.running.Store(true)

go c.watch()

Expand All @@ -83,7 +92,7 @@ func (c *Commander) Restart(ctx context.Context) error {
func (c *Commander) watch() {
c.cmd.Wait()
c.doneCh <- struct{}{}
atomic.StoreInt64(&c.running, 0)
c.running.Store(false)
close(c.waitCh)
}

Expand All @@ -94,33 +103,36 @@ func (c *Commander) Done() <-chan struct{} {

// Pid returns Agent process PID if it is started or 0 if it is not.
func (c *Commander) Pid() int {
if c.cmd == nil || c.cmd.Process == nil {
if !c.IsRunning() {
return 0
}
return c.cmd.Process.Pid
}

// ExitCode returns Agent process exit code if it exited or 0 if it is not.
func (c *Commander) ExitCode() int {
if c.cmd == nil || c.cmd.ProcessState == nil {
if c.IsRunning() {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here I've actually flipped the condition from what it was before, as per the method's docstring.

Copy link
Member

@srikanthccv srikanthccv Jul 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this potentially leave the agent process running in some cases? The NewSupervisor method starts a goroutine runAgentProcess to launch the agent process. There's a possibility that Shutdown might be called before c.cmd.Start() finishes. If c.cmd.Start() completes successfully, the process would keep running, which isn't expected.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, let me think about this (and if the previous code prevented that from happening). We could potentially store the running value a little earlier to avoid that IIUC.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey there, sorry for taking so long to get back to you. As far as I can tell from the code, this was already the case and it isn't a regression from this PR.

To avoid this, we'd make it the caller's responsibility to call only call Stop() after IsRunning() returns true (for example having a backoff mechanism for properly cleaning up resources), or add more explicit synchronization like we do in clientcommon

// True when stopping is in progress.
isStoppingFlag bool
isStoppingMutex sync.RWMutex

IMO this is a separate issue than the race condition the one's fixed here, so tackling the cleanup of resoueces might be a new discussion. WDYT?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not think it was possible to leave the process running in the background because we are checking if c.cmd is non-nil (which is the source of the race condition) instead of running status. I think the Stop should be updated to handle this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the delay.

Here is the case I think this still doesn't address

  1. Start() is called
  2. The process begins starting up but hasn't yet reached the point where c.running.Store(true) is executed
  3. Shutdown() is called during this window
  4. Since !c.IsRunning() evaluates to true at this point, Shutdown() returns immediately
  5. This could result in the process continuing to run

Does that make sense?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI I've pushed another commit in the Stop method to set the isStoppingFlag immediately.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, let me try to get this right.

The commander's Start is called only in two cases
a) In the Supervisor's startAgent.
b) During a restart (not pertinent to our discussion)

The startAgent method is only called via the runAgentProcess; in turn, this is only called as a new goroutine in the NewSupervisor function.

In the Shutdown method, the Commander's Stop sets the isStopping flag.

So let's say that NewSupervisor starts, and the new goroutine is launched.

Immediately, the commander's Shutdown method is called, to call the commander's Stop and set the isStoppingFlag. When the call stack gets to the commander's Start it will immediately exit with the IsStopping method without having a chance to launch any process.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @srikanthccv I've put out a test in f135e3b to try and verify the behavior we want.

I couldn't find any more elegant way than the ugly DelayLogger, but I'd like to see what you think. (This test fails in main, without my fix here).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So let's say that NewSupervisor starts, and the new goroutine is launched.

Immediately, the commander's Shutdown method is called, to call the commander's Stop and set the isStoppingFlag. When the call stack gets to the commander's Start it will immediately exit with the IsStopping method without having a chance to launch any process.

Take this case

NewSupervisor starts, and the goroutine runAgentProcess is launched, -> startAgent -> s.commander.Start -> the execution reaches c.cmd.Start

if err := c.cmd.Start(); err != nil {

And say cmd.Start didn't return yet

Now, assume the commander's Shutdown method is called (c.cmd.Start still didn't return) so c.running is false. In this flow isStoppingFlag is not helping because we would still return as IsRunning evaluates to true.

I think if we add a waitgroup before the c.cmd.Start and wait in shutdown should solve the issue.

return 0
}
return c.cmd.ProcessState.ExitCode()
}

func (c *Commander) IsRunning() bool {
return atomic.LoadInt64(&c.running) != 0
return c.running.Load()
}

// Stop the Agent process. Sends SIGTERM to the process and wait for up 10 seconds
// and if the process does not finish kills it forcedly by sending SIGKILL.
// Returns after the process is terminated.
func (c *Commander) Stop(ctx context.Context) error {
if c.cmd == nil || c.cmd.Process == nil {
c.isStoppingMutex.Lock()
c.isStoppingFlag = true
c.isStoppingMutex.Unlock()

if !c.IsRunning() {
// Not started, nothing to do.
return nil
}

c.logger.Debugf(ctx, "Stopping agent process, PID=%v", c.cmd.Process.Pid)

// Gracefully signal process to stop.
Expand Down Expand Up @@ -159,10 +171,17 @@ func (c *Commander) Stop(ctx context.Context) error {
// Wait for process to terminate
<-c.waitCh

atomic.StoreInt64(&c.running, 0)
c.running.Store(false)

// Let goroutine know process is finished.
close(finished)

return innerErr
}

// IsStopping returns true if Stop() was called.
func (c *Commander) IsStopping() bool {
c.isStoppingMutex.RLock()
defer c.isStoppingMutex.RUnlock()
return c.isStoppingFlag
}
33 changes: 32 additions & 1 deletion internal/examples/supervisor/supervisor/supervisor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ import (
"fmt"
"os"
"testing"

"time"

"github.com/stretchr/testify/assert"

"github.com/open-telemetry/opamp-go/internal"
Expand Down Expand Up @@ -62,3 +63,33 @@ agent:

supervisor.Shutdown()
}

func TestShutdownRaceCondition(t *testing.T) {
tmpDir := changeCurrentDir(t)
os.WriteFile("supervisor.yaml", []byte(fmt.Sprintf(`
server:
endpoint: ws://127.0.0.1:4320/v1/opamp
agent:
executable: %s/dummy_agent.sh`, tmpDir)), 0644)

os.WriteFile("dummy_agent.sh", []byte("#!/bin/sh\nsleep 9999\n"), 0755)

startOpampServer(t)

// There's no great way to ensure Shutdown gets called before Start.
// The DelayLogger ensures some delay before the goroutine gets started.
var supervisor *Supervisor
var err error
supervisor, err = NewSupervisor(&internal.DelayLogger{})
supervisor.Shutdown()
supervisor.hasNewConfig <- struct{}{}

assert.NoError(t, err)

// The Shutdown method has been called before the runAgentProcess goroutine
// gets started and has a chance to load a new process. Make sure no PID
// has been launched.
assert.Never(t, func() bool {
return supervisor.commander.Pid() != 0
}, 2*time.Second, 10*time.Millisecond)
}
10 changes: 10 additions & 0 deletions internal/noplogger.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import (
"context"
"time"

"github.com/open-telemetry/opamp-go/client/types"
)
Expand All @@ -12,3 +13,12 @@

func (l *NopLogger) Debugf(ctx context.Context, format string, v ...interface{}) {}
func (l *NopLogger) Errorf(ctx context.Context, format string, v ...interface{}) {}

type DelayLogger struct{}

func (l *DelayLogger) Debugf(ctx context.Context, format string, v ...interface{}) {
time.Sleep(10 * time.Millisecond)

Check warning on line 20 in internal/noplogger.go

View check run for this annotation

Codecov / codecov/patch

internal/noplogger.go#L19-L20

Added lines #L19 - L20 were not covered by tests
}
func (l *DelayLogger) Errorf(ctx context.Context, format string, v ...interface{}) {
time.Sleep(10 * time.Millisecond)

Check warning on line 23 in internal/noplogger.go

View check run for this annotation

Codecov / codecov/patch

internal/noplogger.go#L22-L23

Added lines #L22 - L23 were not covered by tests
}
Loading