-
Notifications
You must be signed in to change notification settings - Fork 69
/
wsreceiver.go
74 lines (65 loc) · 1.87 KB
/
wsreceiver.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
package internal
import (
"context"
"fmt"
"github.com/gorilla/websocket"
"github.com/open-telemetry/opamp-go/client/types"
"github.com/open-telemetry/opamp-go/internal"
"github.com/open-telemetry/opamp-go/protobufs"
)
// wsReceiver implements the WebSocket client's receiving portion of OpAMP protocol.
type wsReceiver struct {
conn *websocket.Conn
logger types.Logger
sender *WSSender
callbacks types.Callbacks
processor receivedProcessor
}
// NewWSReceiver creates a new Receiver that uses WebSocket to receive
// messages from the server.
func NewWSReceiver(
logger types.Logger,
callbacks types.Callbacks,
conn *websocket.Conn,
sender *WSSender,
clientSyncedState *ClientSyncedState,
packagesStateProvider types.PackagesStateProvider,
capabilities protobufs.AgentCapabilities,
) *wsReceiver {
w := &wsReceiver{
conn: conn,
logger: logger,
sender: sender,
callbacks: callbacks,
processor: newReceivedProcessor(logger, callbacks, sender, clientSyncedState, packagesStateProvider, capabilities),
}
return w
}
// ReceiverLoop runs the receiver loop. To stop the receiver cancel the context.
func (r *wsReceiver) ReceiverLoop(ctx context.Context) {
runContext, cancelFunc := context.WithCancel(ctx)
out:
for {
var message protobufs.ServerToAgent
if err := r.receiveMessage(&message); err != nil {
if ctx.Err() == nil && !websocket.IsCloseError(err, websocket.CloseNormalClosure) {
r.logger.Errorf("Unexpected error while receiving: %v", err)
}
break out
} else {
r.processor.ProcessReceivedMessage(runContext, &message)
}
}
cancelFunc()
}
func (r *wsReceiver) receiveMessage(msg *protobufs.ServerToAgent) error {
_, bytes, err := r.conn.ReadMessage()
if err != nil {
return err
}
err = internal.DecodeWSMessage(bytes, msg)
if err != nil {
return fmt.Errorf("cannot decode received message: %w", err)
}
return err
}