Skip to content

Commit

Permalink
Stop fully when accepting OpAMP connection settings in the agent exam…
Browse files Browse the repository at this point in the history
…ple (open-telemetry#184)

Fixes open-telemetry#178

I reverted open-telemetry#170 which was unnecessary.
We don't really need to call Stop() from callbacks. In fact doing
stopping on a separate goroutine is preferable, properly waiting until
stopping is complete and then trying new connection settings.
  • Loading branch information
tigrannajaryan authored Jul 4, 2023
1 parent 13a6fd1 commit 715146e
Show file tree
Hide file tree
Showing 10 changed files with 42 additions and 227 deletions.
8 changes: 3 additions & 5 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,9 @@ type OpAMPClient interface {
// May be called only once.
// After this call returns successfully it is guaranteed that no
// callbacks will be called. Stop() will cancel context of any in-fly
// callbacks.
//
// If a callback is in progress (e.g. OnMessage is called but not finished)
// Stop() initiates stopping and returns without waiting for stopping to finish.
//
// callbacks, but will wait until such in-fly callbacks are returned before
// Stop returns, so make sure the callbacks don't block infinitely and react
// promptly to context cancellations.
// Once stopped OpAMPClient cannot be started again.
Stop(ctx context.Context) error

Expand Down
79 changes: 0 additions & 79 deletions client/clientimpl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -483,85 +483,6 @@ func TestIncludesDetailsOnReconnect(t *testing.T) {
assert.NoError(t, err)
}

func TestStopFromCallback(t *testing.T) {
// This test verifies calling Stop() from a callback. We had a bug previously
// where Stop() would hang if called from a callback.

callbacksToTest := []string{"connect", "opamp", "message"}
for _, callbackToTest := range callbacksToTest {
t.Run(
callbackToTest, func(t *testing.T) {

testClients(
t, func(t *testing.T, client OpAMPClient) {
var called int64

hash := []byte{1, 2, 3}
opampSettings := &protobufs.OpAMPConnectionSettings{DestinationEndpoint: "http://opamp.com"}

// Start a Server.
srv := internal.StartMockServer(t)
srv.OnMessage = func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent {
if msg != nil {
return &protobufs.ServerToAgent{
ConnectionSettings: &protobufs.ConnectionSettingsOffers{
Hash: hash,
Opamp: opampSettings,
},
}
}
return nil
}

// Start a client.
settings := types.StartSettings{
Callbacks: types.CallbacksStruct{
OnConnectFunc: func() {
if callbackToTest == "connect" {
client.Stop(context.Background())
atomic.StoreInt64(&called, 1)
}
},
OnOpampConnectionSettingsFunc: func(
ctx context.Context, settings *protobufs.OpAMPConnectionSettings,
) error {
if callbackToTest == "opamp" {
client.Stop(context.Background())
atomic.StoreInt64(&called, 1)
}
return nil
},
OnMessageFunc: func(ctx context.Context, msg *types.MessageData) {
if callbackToTest == "message" {
client.Stop(context.Background())
atomic.StoreInt64(&called, 1)
}
},
},
Capabilities: protobufs.AgentCapabilities_AgentCapabilities_AcceptsOpAMPConnectionSettings,
}
settings.OpAMPServerURL = "ws://" + srv.Endpoint
prepareClient(t, &settings, client)

assert.NoError(t, client.Start(context.Background(), settings))

eventually(
t, func() bool {
return atomic.LoadInt64(&called) == 1
},
)

// Shutdown the Server.
srv.Close()

// Shutdown the client.
err := client.Stop(context.Background())
assert.NoError(t, err)
})
})
}
}

func createEffectiveConfig() *protobufs.EffectiveConfig {
cfg := &protobufs.EffectiveConfig{
ConfigMap: &protobufs.AgentConfigMap{
Expand Down
113 changes: 6 additions & 107 deletions client/internal/clientcommon.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"errors"
"fmt"
"sync"
"sync/atomic"

"google.golang.org/protobuf/proto"

Expand All @@ -27,105 +26,11 @@ var (
errReportsPackageStatusesNotSet = errors.New("ReportsPackageStatuses capability is not set")
)

// CallbacksWrapper wraps Callbacks such that it is possible to query if any callback
// function is in progress (called, but not yet returned). This is necessary for
// safe handling of certain ClientCommon methods when they are called from the callbacks.
// See for example Stop() implementation.
type CallbacksWrapper struct {
wrapped types.Callbacks
// Greater than zero if currently processing a callback.
inCallback *int64
}

func (cc *CallbacksWrapper) OnConnect() {
cc.EnterCallback()
defer cc.LeaveCallback()
cc.wrapped.OnConnect()
}

func (cc *CallbacksWrapper) OnConnectFailed(err error) {
cc.EnterCallback()
defer cc.LeaveCallback()
cc.wrapped.OnConnectFailed(err)
}

func (cc *CallbacksWrapper) OnError(err *protobufs.ServerErrorResponse) {
cc.EnterCallback()
defer cc.LeaveCallback()
cc.wrapped.OnError(err)
}

func (cc *CallbacksWrapper) OnMessage(ctx context.Context, msg *types.MessageData) {
cc.EnterCallback()
defer cc.LeaveCallback()
cc.wrapped.OnMessage(ctx, msg)
}

func (cc *CallbacksWrapper) OnOpampConnectionSettings(
ctx context.Context, settings *protobufs.OpAMPConnectionSettings,
) error {
cc.EnterCallback()
defer cc.LeaveCallback()
return cc.wrapped.OnOpampConnectionSettings(ctx, settings)
}

func (cc *CallbacksWrapper) OnOpampConnectionSettingsAccepted(settings *protobufs.OpAMPConnectionSettings) {
cc.EnterCallback()
defer cc.LeaveCallback()
cc.wrapped.OnOpampConnectionSettingsAccepted(settings)
}

func (cc *CallbacksWrapper) SaveRemoteConfigStatus(ctx context.Context, status *protobufs.RemoteConfigStatus) {
cc.EnterCallback()
defer cc.LeaveCallback()
cc.wrapped.SaveRemoteConfigStatus(ctx, status)
}

func (cc *CallbacksWrapper) GetEffectiveConfig(ctx context.Context) (*protobufs.EffectiveConfig, error) {
cc.EnterCallback()
defer cc.LeaveCallback()
return cc.wrapped.GetEffectiveConfig(ctx)
}

func (cc *CallbacksWrapper) OnCommand(command *protobufs.ServerToAgentCommand) error {
cc.EnterCallback()
defer cc.LeaveCallback()
return cc.wrapped.OnCommand(command)
}

var _ types.Callbacks = (*CallbacksWrapper)(nil)

func NewCallbacksWrapper(wrapped types.Callbacks) *CallbacksWrapper {
zero := int64(0)

if wrapped == nil {
// Make sure it is always safe to call Callbacks.
wrapped = types.CallbacksStruct{}
}

return &CallbacksWrapper{
wrapped: wrapped,
inCallback: &zero,
}
}

func (cc *CallbacksWrapper) EnterCallback() {
atomic.AddInt64(cc.inCallback, 1)
}

func (cc *CallbacksWrapper) LeaveCallback() {
atomic.AddInt64(cc.inCallback, -1)
}

func (cc *CallbacksWrapper) InCallback() bool {
return atomic.LoadInt64(cc.inCallback) != 0
}

// ClientCommon contains the OpAMP logic that is common between WebSocket and
// plain HTTP transports.
type ClientCommon struct {
Logger types.Logger
Callbacks *CallbacksWrapper
Callbacks types.Callbacks

// Agent's capabilities defined at Start() time.
Capabilities protobufs.AgentCapabilities
Expand Down Expand Up @@ -225,7 +130,11 @@ func (c *ClientCommon) PrepareStart(
}

// Prepare callbacks.
c.Callbacks = NewCallbacksWrapper(settings.Callbacks)
c.Callbacks = settings.Callbacks
if c.Callbacks == nil {
// Make sure it is always safe to call Callbacks.
c.Callbacks = types.CallbacksStruct{}
}

if err := c.sender.SetInstanceUid(settings.InstanceUid); err != nil {
return err
Expand All @@ -247,16 +156,6 @@ func (c *ClientCommon) Stop(ctx context.Context) error {

cancelFunc()

if c.Callbacks.InCallback() {
// Stop() is called from a callback. We cannot wait and block here
// because the c.stoppedSignal may not be set until the callback
// returns. This is the case for example when OnMessage callback is
// called. So, for this case we return immediately and the caller
// needs to be aware that Stop() does not wait for stopping to
// finish in this case.
return nil
}

// Wait until stopping is finished.
select {
case <-ctx.Done():
Expand Down
7 changes: 3 additions & 4 deletions client/internal/httpsender.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,8 @@ import (
"time"

"github.com/cenkalti/backoff/v4"
"google.golang.org/protobuf/proto"

"github.com/open-telemetry/opamp-go/internal"
"google.golang.org/protobuf/proto"

"github.com/open-telemetry/opamp-go/client/types"
"github.com/open-telemetry/opamp-go/protobufs"
Expand All @@ -37,7 +36,7 @@ type HTTPSender struct {
url string
logger types.Logger
client *http.Client
callbacks *CallbacksWrapper
callbacks types.Callbacks
pollingIntervalMs int64
compressionEnabled bool

Expand Down Expand Up @@ -71,7 +70,7 @@ func NewHTTPSender(logger types.Logger) *HTTPSender {
func (h *HTTPSender) Run(
ctx context.Context,
url string,
callbacks *CallbacksWrapper,
callbacks types.Callbacks,
clientSyncedState *ClientSyncedState,
packagesStateProvider types.PackagesStateProvider,
capabilities protobufs.AgentCapabilities,
Expand Down
7 changes: 3 additions & 4 deletions client/internal/httpsender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,10 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"

"github.com/open-telemetry/opamp-go/client/types"
sharedinternal "github.com/open-telemetry/opamp-go/internal"
"github.com/open-telemetry/opamp-go/protobufs"
"github.com/stretchr/testify/assert"
)

func TestHTTPSenderRetryForStatusTooManyRequests(t *testing.T) {
Expand Down Expand Up @@ -42,12 +41,12 @@ func TestHTTPSenderRetryForStatusTooManyRequests(t *testing.T) {
}},
}
})
sender.callbacks = NewCallbacksWrapper(types.CallbacksStruct{
sender.callbacks = types.CallbacksStruct{
OnConnectFunc: func() {
},
OnConnectFailedFunc: func(_ error) {
},
})
}
sender.url = url
start := time.Now()
resp, err := sender.sendRequestWithRetries(ctx)
Expand Down
4 changes: 2 additions & 2 deletions client/internal/receivedprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ type receivedProcessor struct {
logger types.Logger

// Callbacks to call for corresponding messages.
callbacks *CallbacksWrapper
callbacks types.Callbacks

// A sender to cooperate with when the received message has an impact on
// what will be sent later.
Expand All @@ -30,7 +30,7 @@ type receivedProcessor struct {

func newReceivedProcessor(
logger types.Logger,
callbacks *CallbacksWrapper,
callbacks types.Callbacks,
sender Sender,
clientSyncedState *ClientSyncedState,
packagesStateProvider types.PackagesStateProvider,
Expand Down
5 changes: 2 additions & 3 deletions client/internal/wsreceiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"

"github.com/gorilla/websocket"

"github.com/open-telemetry/opamp-go/client/types"
"github.com/open-telemetry/opamp-go/internal"
"github.com/open-telemetry/opamp-go/protobufs"
Expand All @@ -16,15 +15,15 @@ type wsReceiver struct {
conn *websocket.Conn
logger types.Logger
sender *WSSender
callbacks *CallbacksWrapper
callbacks types.Callbacks
processor receivedProcessor
}

// NewWSReceiver creates a new Receiver that uses WebSocket to receive
// messages from the server.
func NewWSReceiver(
logger types.Logger,
callbacks *CallbacksWrapper,
callbacks types.Callbacks,
conn *websocket.Conn,
sender *WSSender,
clientSyncedState *ClientSyncedState,
Expand Down
4 changes: 2 additions & 2 deletions client/internal/wsreceiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func TestServerToAgentCommand(t *testing.T) {
remoteConfigStatus: &protobufs.RemoteConfigStatus{},
}
sender := WSSender{}
receiver := NewWSReceiver(TestLogger{t}, NewCallbacksWrapper(callbacks), nil, &sender, &clientSyncedState, nil, 0)
receiver := NewWSReceiver(TestLogger{t}, callbacks, nil, &sender, &clientSyncedState, nil, 0)
receiver.processor.ProcessReceivedMessage(context.Background(), &protobufs.ServerToAgent{
Command: test.command,
})
Expand All @@ -100,7 +100,7 @@ func TestServerToAgentCommandExclusive(t *testing.T) {
},
}
clientSyncedState := ClientSyncedState{}
receiver := NewWSReceiver(TestLogger{t}, NewCallbacksWrapper(callbacks), nil, nil, &clientSyncedState, nil, 0)
receiver := NewWSReceiver(TestLogger{t}, callbacks, nil, nil, &clientSyncedState, nil, 0)
receiver.processor.ProcessReceivedMessage(context.Background(), &protobufs.ServerToAgent{
Command: &protobufs.ServerToAgentCommand{
Type: protobufs.CommandType_CommandType_Restart,
Expand Down
13 changes: 7 additions & 6 deletions client/wsclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (c *wsClient) tryConnectOnce(ctx context.Context) (err error, retryAfter sh
var resp *http.Response
conn, resp, err := c.dialer.DialContext(ctx, c.url.String(), c.requestHeader)
if err != nil {
if !c.common.IsStopping() {
if c.common.Callbacks != nil && !c.common.IsStopping() {
c.common.Callbacks.OnConnectFailed(err)
}
if resp != nil {
Expand All @@ -138,7 +138,9 @@ func (c *wsClient) tryConnectOnce(ctx context.Context) (err error, retryAfter sh
c.connMutex.Lock()
c.conn = conn
c.connMutex.Unlock()
c.common.Callbacks.OnConnect()
if c.common.Callbacks != nil {
c.common.Callbacks.OnConnect()
}

return nil, sharedinternal.OptionalDuration{Defined: false}
}
Expand Down Expand Up @@ -191,10 +193,9 @@ func (c *wsClient) ensureConnected(ctx context.Context) error {
}

// runOneCycle performs the following actions:
// 1. connect (try until succeeds).
// 2. send first status report.
// 3. receive and process messages until error happens.
//
// 1. connect (try until succeeds).
// 2. send first status report.
// 3. receive and process messages until error happens.
// If it encounters an error it closes the connection and returns.
// Will stop and return if Stop() is called (ctx is cancelled, isStopping is set).
func (c *wsClient) runOneCycle(ctx context.Context) {
Expand Down
Loading

0 comments on commit 715146e

Please sign in to comment.