Skip to content

Commit

Permalink
Gracefully close client websocket connections
Browse files Browse the repository at this point in the history
  • Loading branch information
evan-bradley committed Sep 20, 2023
1 parent fd3066f commit ca9f1d7
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 3 deletions.
7 changes: 7 additions & 0 deletions client/internal/nextmessage.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@ func (s *NextMessage) Update(modifier func(msg *protobufs.AgentToServer)) {
s.messageMutex.Unlock()
}

// IsPending returns whether there is a pending message to be sent.
func (s *NextMessage) IsPending() bool {
s.messageMutex.Lock()
defer s.messageMutex.Unlock()
return s.messagePending
}

// PopPending returns the next message to be sent, if it is pending or nil otherwise.
// Clears the "pending" flag.
func (s *NextMessage) PopPending() *protobufs.AgentToServer {
Expand Down
41 changes: 38 additions & 3 deletions client/wsclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,41 @@ func (c *wsClient) Stop(ctx context.Context) error {
conn := c.conn
c.connMutex.RUnlock()

ticker := time.NewTicker(50 * time.Millisecond)

// Wait for all remaining messages to be sent. Continuing to attempt
// to send messages after calling Stop will eventually result in lost
// messages.
select {
case <-ticker.C:
if !c.sender.NextMessage().IsPending() {
break
}
case <-time.After(3 * time.Second):
break

Check warning on line 99 in client/wsclient.go

View check run for this annotation

Codecov / codecov/patch

client/wsclient.go#L98-L99

Added lines #L98 - L99 were not covered by tests
}

ticker.Stop()

if conn != nil {
defaultCloseHandler := conn.CloseHandler()
closed := make(chan bool)

// The server should respond with a close message of its own, which will
// trigger this callback. At this point the close sequence has been
// completed and the TCP connection can be gracefully closed.
conn.SetCloseHandler(func(code int, text string) error {
closed <- true
return defaultCloseHandler(code, text)
})

message := websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")
conn.WriteControl(websocket.CloseMessage, message, time.Now().Add(time.Second))

select {
case <-time.After(3 * time.Second):
case <-closed:
}
_ = conn.Close()
}

Expand Down Expand Up @@ -193,9 +227,10 @@ func (c *wsClient) ensureConnected(ctx context.Context) error {
}

// runOneCycle performs the following actions:
// 1. connect (try until succeeds).
// 2. send first status report.
// 3. receive and process messages until error happens.
// 1. connect (try until succeeds).
// 2. send first status report.
// 3. receive and process messages until error happens.
//
// If it encounters an error it closes the connection and returns.
// Will stop and return if Stop() is called (ctx is cancelled, isStopping is set).
func (c *wsClient) runOneCycle(ctx context.Context) {
Expand Down

0 comments on commit ca9f1d7

Please sign in to comment.