Skip to content

Commit

Permalink
feat: support client side heartbeat
Browse files Browse the repository at this point in the history
  • Loading branch information
haoqixu committed Aug 23, 2024
1 parent 5a4798b commit 52d2bef
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 2 deletions.
6 changes: 6 additions & 0 deletions client/internal/httpsender.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,12 @@ func (h *HTTPSender) receiveResponse(ctx context.Context, resp *http.Response) {
h.receiveProcessor.ProcessReceivedMessage(ctx, &response)
}

func (h *HTTPSender) SetHeartbeatInterval(duration time.Duration) {
if duration != 0 {
h.SetPollingInterval(duration)
}
}

// SetPollingInterval sets the interval between polling. Has effect starting from the
// next polling cycle.
func (h *HTTPSender) SetPollingInterval(duration time.Duration) {
Expand Down
5 changes: 5 additions & 0 deletions client/internal/receivedprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package internal
import (
"context"
"fmt"
"time"

"github.com/open-telemetry/opamp-go/client/types"
"github.com/open-telemetry/opamp-go/protobufs"
Expand Down Expand Up @@ -209,6 +210,10 @@ func (r *receivedProcessor) rcvOpampConnectionSettings(ctx context.Context, sett
return
}

if r.hasCapability(protobufs.AgentCapabilities_AgentCapabilities_ReportsHeartbeat) {
r.sender.SetHeartbeatInterval(time.Duration(settings.Opamp.HeartbeatIntervalSeconds) * time.Second)
}

if r.hasCapability(protobufs.AgentCapabilities_AgentCapabilities_AcceptsOpAMPConnectionSettings) {
err := r.callbacks.OnOpampConnectionSettings(ctx, settings.Opamp)
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions client/internal/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package internal

import (
"errors"
"time"

"github.com/open-telemetry/opamp-go/client/types"
"github.com/open-telemetry/opamp-go/protobufs"
Expand All @@ -22,6 +23,9 @@ type Sender interface {

// SetInstanceUid sets a new instanceUid to be used for all subsequent messages to be sent.
SetInstanceUid(instanceUid types.InstanceUid) error

// SetHeartbeatInterval sets the interval for the agent heartbeats.
SetHeartbeatInterval(duration time.Duration)
}

// SenderCommon is partial Sender implementation that is common between WebSocket and plain
Expand Down
49 changes: 47 additions & 2 deletions client/internal/wssender.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package internal

import (
"context"
"sync/atomic"
"time"

"github.com/gorilla/websocket"
Expand All @@ -13,7 +14,8 @@ import (
)

const (
defaultSendCloseMessageTimeout = 5 * time.Second
defaultSendCloseMessageTimeout = 5 * time.Second
defaultHeartbeatIntervalSeconds = 30
)

// WSSender implements the WebSocket client's sending portion of OpAMP protocol.
Expand All @@ -25,15 +27,24 @@ type WSSender struct {
// Indicates that the sender has fully stopped.
stopped chan struct{}
err error

heartbeatIntervalUpdated chan struct{}
heartbeatIntervalSeconds atomic.Int64
heartbeatTimer *time.Timer
}

// NewSender creates a new Sender that uses WebSocket to send
// messages to the server.
func NewSender(logger types.Logger) *WSSender {
return &WSSender{
s := &WSSender{
logger: logger,
SenderCommon: NewSenderCommon(),
}
s.heartbeatIntervalUpdated = make(chan struct{}, 1)
s.heartbeatIntervalSeconds.Store(defaultHeartbeatIntervalSeconds)
s.heartbeatTimer = time.NewTimer(0)

return s
}

// Start the sender and send the first message that was set via NextMessage().Update()
Expand Down Expand Up @@ -62,10 +73,43 @@ func (s *WSSender) StoppingErr() error {
return s.err
}

// SetHeartbeatInterval ...
func (s *WSSender) SetHeartbeatInterval(d time.Duration) {
s.heartbeatIntervalSeconds.Store(int64(d.Seconds()))
select {
case s.heartbeatIntervalUpdated <- struct{}{}:
default:
}
}

func (s *WSSender) shouldSendHeartbeat() <-chan time.Time {
t := s.heartbeatTimer

if !t.Stop() {
select {
case <-t.C:
default:
}
}

if d := time.Duration(s.heartbeatIntervalSeconds.Load()) * time.Second; d != 0 {
t.Reset(d)
return t.C
}

// Heartbeat interval is set to Zero, disable heartbeat.
return nil
}

func (s *WSSender) run(ctx context.Context) {
out:
for {
select {
case <-s.shouldSendHeartbeat():
s.NextMessage().Update(func(msg *protobufs.AgentToServer) {})
s.ScheduleSend()
case <-s.heartbeatIntervalUpdated:
// trigger heartbeat timer reset
case <-s.hasPendingMessage:
s.sendNextMessage(ctx)

Expand All @@ -77,6 +121,7 @@ out:
}
}

s.heartbeatTimer.Stop()
close(s.stopped)
}

Expand Down

0 comments on commit 52d2bef

Please sign in to comment.