Skip to content

Commit

Permalink
Add ServerToAgent_RequestRestart flag to request agent restart
Browse files Browse the repository at this point in the history
  • Loading branch information
andykellr committed Jan 18, 2022
1 parent 5255de5 commit f9920a9
Show file tree
Hide file tree
Showing 8 changed files with 501 additions and 424 deletions.
126 changes: 0 additions & 126 deletions client/callbacks.go

This file was deleted.

12 changes: 6 additions & 6 deletions client/clientimpl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func TestConnectNoServer(t *testing.T) {
func TestOnConnectFail(t *testing.T) {
var connectErr atomic.Value
settings := createNoServerSettings()
settings.Callbacks = CallbacksStruct{
settings.Callbacks = types.CallbacksStruct{
OnConnectFailedFunc: func(err error) {
connectErr.Store(err)
},
Expand Down Expand Up @@ -117,7 +117,7 @@ func TestConnectWithServer(t *testing.T) {
// Start a client.
var connected int64
settings := StartSettings{
Callbacks: CallbacksStruct{
Callbacks: types.CallbacksStruct{
OnConnectFunc: func() {
atomic.StoreInt64(&connected, 1)
},
Expand Down Expand Up @@ -154,7 +154,7 @@ func TestConnectWithServer503(t *testing.T) {
var clientConnected int64
var connectErr atomic.Value
settings := StartSettings{
Callbacks: CallbacksStruct{
Callbacks: types.CallbacksStruct{
OnConnectFunc: func() {
atomic.StoreInt64(&clientConnected, 1)
assert.Fail(t, "Client should not be able to connect")
Expand Down Expand Up @@ -217,7 +217,7 @@ func TestDisconnectByServer(t *testing.T) {
var connected int64
var connectErr atomic.Value
settings := StartSettings{
Callbacks: CallbacksStruct{
Callbacks: types.CallbacksStruct{
OnConnectFunc: func() {
atomic.StoreInt64(&connected, 1)
},
Expand Down Expand Up @@ -266,7 +266,7 @@ func TestFirstStatusReport(t *testing.T) {
// Start a client.
var connected, remoteConfigReceived int64
settings := StartSettings{
Callbacks: CallbacksStruct{
Callbacks: types.CallbacksStruct{
OnConnectFunc: func() {
atomic.AddInt64(&connected, 1)
},
Expand Down Expand Up @@ -472,7 +472,7 @@ func TestConnectionSettings(t *testing.T) {

// Start a client.
settings := StartSettings{
Callbacks: CallbacksStruct{
Callbacks: types.CallbacksStruct{
OnOpampConnectionSettingsFunc: func(
ctx context.Context, settings *protobufs.ConnectionSettings,
) error {
Expand Down
7 changes: 7 additions & 0 deletions client/internal/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ func (r *Receiver) processReceivedMessage(ctx context.Context, msg *protobufs.Se

r.rcvConnectionSettings(ctx, msg.ConnectionSettings)
r.rcvAddonsAvailable(msg.AddonsAvailable)
r.rcvRestartRequest(msg.Flags)

if reportStatus {
r.sender.ScheduleSend()
Expand Down Expand Up @@ -168,3 +169,9 @@ func (r *Receiver) processErrorResponse(body *protobufs.ServerErrorResponse) {
func (r *Receiver) rcvAddonsAvailable(addons *protobufs.AddonsAvailable) {
// TODO: implement this.
}

func (r *Receiver) rcvRestartRequest(flags protobufs.ServerToAgent_Flags) {
if flags&protobufs.ServerToAgent_RequestRestart != 0 {
r.callbacks.OnRestartRequested()
}
}
50 changes: 50 additions & 0 deletions client/internal/receiver_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package internal

import (
"context"
"testing"

"github.com/stretchr/testify/assert"

"github.com/open-telemetry/opamp-go/client/types"
"github.com/open-telemetry/opamp-go/protobufs"
)

type TestLogger struct {
*testing.T
}

func (logger TestLogger) Debugf(format string, v ...interface{}) {
logger.Logf(format, v...)
}

func TestRequestRestart(t *testing.T) {
called := false

callbacks := types.CallbacksStruct{
OnRestartRequestedFunc: func() error {
called = true
return nil
},
}
receiver := NewReceiver(TestLogger{t}, callbacks, nil, nil)
receiver.processReceivedMessage(context.Background(), &protobufs.ServerToAgent{
Flags: protobufs.ServerToAgent_RequestRestart,
})
assert.Equal(t, true, called, "requestRestart flag should trigger the OnRestartRequested callback")
}

func TestNoRequestRestart(t *testing.T) {
called := false
callbacks := types.CallbacksStruct{
OnRestartRequestedFunc: func() error {
called = true
return nil
},
}
receiver := NewReceiver(TestLogger{t}, callbacks, nil, nil)
receiver.processReceivedMessage(context.Background(), &protobufs.ServerToAgent{
Flags: 0,
})
assert.Equal(t, false, called, "without RequestRestart flag, do not trigger the OnRestartRequested callback")
}
130 changes: 130 additions & 0 deletions client/types/callbacks.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,137 @@ type Callbacks interface {
// syncer can be used to initiate syncing the package from the server.
OnAgentPackageAvailable(addons *protobufs.AgentPackageAvailable, syncer AgentPackageSyncer) error

// OnRestartRequested is called when the server requests a restart of the connected agent.
OnRestartRequested() error

// For all methods that accept a context parameter the caller may cancel the
// context if processing takes too long. In that case the method should return
// as soon as possible with an error.
}

type CallbacksStruct struct {
OnConnectFunc func()
OnConnectFailedFunc func(err error)
OnErrorFunc func(err *protobufs.ServerErrorResponse)

OnRemoteConfigFunc func(
ctx context.Context,
remoteConfig *protobufs.AgentRemoteConfig,
) (effectiveConfig *protobufs.EffectiveConfig, configChanged bool, err error)

OnOpampConnectionSettingsFunc func(
ctx context.Context,
settings *protobufs.ConnectionSettings,
) error
OnOpampConnectionSettingsAcceptedFunc func(
settings *protobufs.ConnectionSettings,
)

OnOwnTelemetryConnectionSettingsFunc func(
ctx context.Context,
telemetryType OwnTelemetryType,
settings *protobufs.ConnectionSettings,
) error

OnOtherConnectionSettingsFunc func(
ctx context.Context,
name string,
settings *protobufs.ConnectionSettings,
) error

OnAddonsAvailableFunc func(ctx context.Context, addons *protobufs.AddonsAvailable, syncer AddonSyncer) error
OnAgentPackageAvailableFunc func(addons *protobufs.AgentPackageAvailable, syncer AgentPackageSyncer) error

OnRestartRequestedFunc func() error
}

var _ Callbacks = (*CallbacksStruct)(nil)

func (c CallbacksStruct) OnConnect() {
if c.OnConnectFunc != nil {
c.OnConnectFunc()
}
}

func (c CallbacksStruct) OnConnectFailed(err error) {
if c.OnConnectFailedFunc != nil {
c.OnConnectFailedFunc(err)
}
}

func (c CallbacksStruct) OnError(err *protobufs.ServerErrorResponse) {
if c.OnErrorFunc != nil {
c.OnErrorFunc(err)
}
}

func (c CallbacksStruct) OnRemoteConfig(
ctx context.Context,
remoteConfig *protobufs.AgentRemoteConfig,
) (effectiveConfig *protobufs.EffectiveConfig, configChanged bool, err error) {
if c.OnRemoteConfigFunc != nil {
return c.OnRemoteConfigFunc(ctx, remoteConfig)
}
return nil, false, nil
}

func (c CallbacksStruct) OnOpampConnectionSettings(
ctx context.Context, settings *protobufs.ConnectionSettings,
) error {
if c.OnOpampConnectionSettingsFunc != nil {
return c.OnOpampConnectionSettingsFunc(ctx, settings)
}
return nil
}

func (c CallbacksStruct) OnOpampConnectionSettingsAccepted(settings *protobufs.ConnectionSettings) {
if c.OnOpampConnectionSettingsAcceptedFunc != nil {
c.OnOpampConnectionSettingsAcceptedFunc(settings)
}
}

func (c CallbacksStruct) OnOwnTelemetryConnectionSettings(
ctx context.Context, telemetryType OwnTelemetryType,
settings *protobufs.ConnectionSettings,
) error {
if c.OnOwnTelemetryConnectionSettingsFunc != nil {
return c.OnOwnTelemetryConnectionSettingsFunc(ctx, telemetryType, settings)
}
return nil
}

func (c CallbacksStruct) OnOtherConnectionSettings(
ctx context.Context, name string, settings *protobufs.ConnectionSettings,
) error {
if c.OnOtherConnectionSettingsFunc != nil {
return c.OnOtherConnectionSettingsFunc(ctx, name, settings)
}
return nil
}

func (c CallbacksStruct) OnAddonsAvailable(
ctx context.Context,
addons *protobufs.AddonsAvailable,
syncer AddonSyncer,
) error {
if c.OnAddonsAvailableFunc != nil {
return c.OnAddonsAvailableFunc(ctx, addons, syncer)
}
return nil
}

func (c CallbacksStruct) OnAgentPackageAvailable(
addons *protobufs.AgentPackageAvailable, syncer AgentPackageSyncer,
) error {
if c.OnAgentPackageAvailableFunc != nil {
return c.OnAgentPackageAvailableFunc(addons, syncer)
}
return nil
}

func (c CallbacksStruct) OnRestartRequested() error {
if c.OnRestartRequestedFunc != nil {
return c.OnRestartRequestedFunc()
}
return nil
}
2 changes: 1 addition & 1 deletion internal/examples/agent/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (agent *Agent) start() error {
OpAMPServerURL: "ws://127.0.0.1:4320/v1/opamp",
InstanceUid: agent.instanceId.String(),
AgentDescription: agent.agentDescription,
Callbacks: client.CallbacksStruct{
Callbacks: types.CallbacksStruct{
OnConnectFunc: func() {
agent.logger.Debugf("Connected to the server.")
},
Expand Down
Loading

0 comments on commit f9920a9

Please sign in to comment.