Skip to content

Commit

Permalink
Adds another test to demonstrate and verify sending multiple messages…
Browse files Browse the repository at this point in the history
… in sequence
  • Loading branch information
andykellr committed Feb 19, 2024
1 parent 0e10092 commit 2c87247
Showing 1 changed file with 76 additions and 0 deletions.
76 changes: 76 additions & 0 deletions client/clientimpl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"math/rand"
"net/http"
"net/http/httptest"
Expand Down Expand Up @@ -1844,3 +1845,78 @@ func TestSendCustomMessageConflict(t *testing.T) {
)
})
}

// TestStreamCustomMessages tests the ability to send may custom messages in succession.
func TestCustomMessagesSendAndWait(t *testing.T) {
testClients(t, func(t *testing.T, client OpAMPClient) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

// Start a Server.
srv := internal.StartMockServer(t)

numTestMessages := 10
makeTestCustomMessage := func(i int) *protobufs.CustomMessage {
return &protobufs.CustomMessage{
Capability: "local.test.example",
Type: "hello",
Data: []byte(fmt.Sprintf("test message %d", i)),
}
}

// The OnMessage callback puts CustomMessages on a channel to be verified
rcvCustomMessages := make(chan *protobufs.CustomMessage)
srv.OnMessage = func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent {
if msg.CustomMessage != nil {
select {
case rcvCustomMessages <- msg.CustomMessage:
case <-ctx.Done():
assert.NoError(t, ctx.Err())
}
}
return nil
}

// Start a client.
settings := types.StartSettings{
OpAMPServerURL: "ws://" + srv.Endpoint,
CustomCapabilities: []string{"local.test.example"},
}
prepareClient(t, &settings, client)

assert.NoError(t, client.Start(context.Background(), settings))

// Send the test messages from a separate goroutine
go func() {
for i := 0; i < numTestMessages; i++ {
messageSendingChannel, err := client.SendCustomMessage(makeTestCustomMessage(i))
assert.NoError(t, err)

// Wait for the sending channel to be closed before sending another message.
// Without this the next SendCustomMessage will fail with ErrCustomMessagePending.
select {
case <-messageSendingChannel:
case <-ctx.Done():
assert.NoError(t, ctx.Err())
}
}
}()

// Verify messages received by the server
for i := 0; i < numTestMessages; i++ {
select {
case msg := <-rcvCustomMessages:
assert.True(t, proto.Equal(makeTestCustomMessage(i), msg))
case <-ctx.Done():
assert.NoError(t, ctx.Err())
}
}

// Shutdown the Server.
srv.Close()

// Shutdown the client.
err := client.Stop(context.Background())
assert.NoError(t, err)
})
}

0 comments on commit 2c87247

Please sign in to comment.