Skip to content

Commit

Permalink
Allow OpAMP server to request an agent restart or shutdown (#53)
Browse files Browse the repository at this point in the history
* Adds ServerToAgentCommand to allow the server to request a restart or shutdown.
* Adds AcceptsRestartRequests to AgentCapabilities so that agents can indicate if they support restart.
* Adds OnCommand to the client Callbacks which is called when the server requests shutdown via the ServerToAgentCommand
* Moved client.CallbacksStruct to types.CallbacksStruct so that it could be used it in receiver_test.go without an import cycle.
  • Loading branch information
andykellr authored Mar 29, 2022
1 parent 3c62aca commit 528e802
Show file tree
Hide file tree
Showing 8 changed files with 838 additions and 561 deletions.
126 changes: 0 additions & 126 deletions client/callbacks.go

This file was deleted.

12 changes: 6 additions & 6 deletions client/clientimpl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func TestConnectNoServer(t *testing.T) {
func TestOnConnectFail(t *testing.T) {
var connectErr atomic.Value
settings := createNoServerSettings()
settings.Callbacks = CallbacksStruct{
settings.Callbacks = types.CallbacksStruct{
OnConnectFailedFunc: func(err error) {
connectErr.Store(err)
},
Expand Down Expand Up @@ -117,7 +117,7 @@ func TestConnectWithServer(t *testing.T) {
// Start a client.
var connected int64
settings := StartSettings{
Callbacks: CallbacksStruct{
Callbacks: types.CallbacksStruct{
OnConnectFunc: func() {
atomic.StoreInt64(&connected, 1)
},
Expand Down Expand Up @@ -154,7 +154,7 @@ func TestConnectWithServer503(t *testing.T) {
var clientConnected int64
var connectErr atomic.Value
settings := StartSettings{
Callbacks: CallbacksStruct{
Callbacks: types.CallbacksStruct{
OnConnectFunc: func() {
atomic.StoreInt64(&clientConnected, 1)
assert.Fail(t, "Client should not be able to connect")
Expand Down Expand Up @@ -217,7 +217,7 @@ func TestDisconnectByServer(t *testing.T) {
var connected int64
var connectErr atomic.Value
settings := StartSettings{
Callbacks: CallbacksStruct{
Callbacks: types.CallbacksStruct{
OnConnectFunc: func() {
atomic.StoreInt64(&connected, 1)
},
Expand Down Expand Up @@ -266,7 +266,7 @@ func TestFirstStatusReport(t *testing.T) {
// Start a client.
var connected, remoteConfigReceived int64
settings := StartSettings{
Callbacks: CallbacksStruct{
Callbacks: types.CallbacksStruct{
OnConnectFunc: func() {
atomic.AddInt64(&connected, 1)
},
Expand Down Expand Up @@ -472,7 +472,7 @@ func TestConnectionSettings(t *testing.T) {

// Start a client.
settings := StartSettings{
Callbacks: CallbacksStruct{
Callbacks: types.CallbacksStruct{
OnOpampConnectionSettingsFunc: func(
ctx context.Context, settings *protobufs.ConnectionSettings,
) error {
Expand Down
12 changes: 12 additions & 0 deletions client/internal/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,12 @@ func (r *Receiver) receiveMessage(msg *protobufs.ServerToAgent) error {

func (r *Receiver) processReceivedMessage(ctx context.Context, msg *protobufs.ServerToAgent) {
if r.callbacks != nil {
// If a command message exists, other messages will be ignored
if msg.Command != nil {
r.rcvCommand(msg.Command)
return
}

reportStatus := r.rcvRemoteConfig(ctx, msg.RemoteConfig, msg.Flags)

r.rcvConnectionSettings(ctx, msg.ConnectionSettings)
Expand Down Expand Up @@ -168,3 +174,9 @@ func (r *Receiver) processErrorResponse(body *protobufs.ServerErrorResponse) {
func (r *Receiver) rcvAddonsAvailable(addons *protobufs.AddonsAvailable) {
// TODO: implement this.
}

func (r *Receiver) rcvCommand(command *protobufs.ServerToAgentCommand) {
if command != nil {
r.callbacks.OnCommand(command)
}
}
105 changes: 105 additions & 0 deletions client/internal/receiver_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package internal

import (
"context"
"fmt"
"testing"

"github.com/stretchr/testify/assert"

"github.com/open-telemetry/opamp-go/client/types"
"github.com/open-telemetry/opamp-go/protobufs"
)

type TestLogger struct {
*testing.T
}

func (logger TestLogger) Debugf(format string, v ...interface{}) {
logger.Logf(format, v...)
}

type commandAction int

const (
none commandAction = iota
restart
unknown
)

func TestServerToAgentCommand(t *testing.T) {

tests := []struct {
command *protobufs.ServerToAgentCommand
action commandAction
message string
}{
{
command: nil,
action: none,
message: "No command should result in no action",
},
{
command: &protobufs.ServerToAgentCommand{
Type: protobufs.ServerToAgentCommand_Restart,
},
action: restart,
message: "A Restart command should result in a restart",
},
{
command: &protobufs.ServerToAgentCommand{
Type: -1,
},
action: unknown,
message: "An unknown command is still passed to the OnCommand callback",
},
}

for i, test := range tests {
t.Run(fmt.Sprint(i), func(t *testing.T) {
action := none

callbacks := types.CallbacksStruct{
OnCommandFunc: func(command *protobufs.ServerToAgentCommand) error {
switch command.Type {
case protobufs.ServerToAgentCommand_Restart:
action = restart
default:
action = unknown
}
return nil
},
}
receiver := NewReceiver(TestLogger{t}, callbacks, nil, nil)
receiver.processReceivedMessage(context.Background(), &protobufs.ServerToAgent{
Command: test.command,
})
assert.Equal(t, test.action, action, test.message)
})
}
}

func TestServerToAgentCommandExclusive(t *testing.T) {
calledCommand := false
calledRemoteConfig := false

callbacks := types.CallbacksStruct{
OnCommandFunc: func(command *protobufs.ServerToAgentCommand) error {
calledCommand = true
return nil
},
OnRemoteConfigFunc: func(ctx context.Context, remoteConfig *protobufs.AgentRemoteConfig) (effectiveConfig *protobufs.EffectiveConfig, configChanged bool, err error) {
calledRemoteConfig = true
return nil, false, nil
},
}
receiver := NewReceiver(TestLogger{t}, callbacks, nil, nil)
receiver.processReceivedMessage(context.Background(), &protobufs.ServerToAgent{
Command: &protobufs.ServerToAgentCommand{
Type: protobufs.ServerToAgentCommand_Restart,
},
RemoteConfig: &protobufs.AgentRemoteConfig{},
})
assert.Equal(t, true, calledCommand, "OnCommand should be called when a Command is specified")
assert.Equal(t, false, calledRemoteConfig, "OnRemoteConfig should not be called when a Command is specified")
}
Loading

0 comments on commit 528e802

Please sign in to comment.