diff --git a/socketmode/deadman.go b/socketmode/deadman.go deleted file mode 100644 index a6eb1a1d8..000000000 --- a/socketmode/deadman.go +++ /dev/null @@ -1,34 +0,0 @@ -package socketmode - -import "time" - -type deadmanTimer struct { - timeout time.Duration - timer *time.Timer -} - -func newDeadmanTimer(timeout time.Duration) *deadmanTimer { - return &deadmanTimer{ - timeout: timeout, - timer: time.NewTimer(timeout), - } -} - -func (smc *deadmanTimer) Elapsed() <-chan time.Time { - return smc.timer.C -} - -func (smc *deadmanTimer) Reset() { - // FIXME: Race on "deadmanTimer", timer channel cannot be read concurrently while resetting. - // "This should not be done concurrent to other receives from the Timer's channel." - // https://pkg.go.dev/time#Timer.Reset - // See socket_mode_managed_conn.go lines ~59 & ~151. - if !smc.timer.Stop() { - select { - case <-smc.timer.C: - default: - } - } - - smc.timer.Reset(smc.timeout) -} diff --git a/socketmode/socket_mode_managed_conn.go b/socketmode/socket_mode_managed_conn.go index 5bc6b752e..b94456f49 100644 --- a/socketmode/socket_mode_managed_conn.go +++ b/socketmode/socket_mode_managed_conn.go @@ -56,13 +56,12 @@ func (smc *Client) RunContext(ctx context.Context) error { func (smc *Client) run(ctx context.Context, connectionCount int) error { messages := make(chan json.RawMessage, 1) - // FIXME: Race on "deadmanTimer", timer channel cannot be read concurrently while resetting. - // "This should not be done concurrent to other receives from the Timer's channel." - // https://pkg.go.dev/time#Timer.Reset - // See deadman.go line ~22. - deadmanTimer := newDeadmanTimer(smc.maxPingInterval) + pingChan := make(chan time.Time, 1) pingHandler := func(_ string) error { - deadmanTimer.Reset() + select { + case pingChan <- time.Now(): + default: + } return nil } @@ -138,26 +137,44 @@ func (smc *Client) run(ctx context.Context, connectionCount int) error { }() wg.Add(1) - go func() { + go func(pingInterval time.Duration) { defer wg.Done() - - select { - case <-ctx.Done(): + defer func() { // Detect when the connection is dead and try close connection. if err := conn.Close(); err != nil { smc.Debugf("Failed to close connection: %v", err) } + }() + + done := ctx.Done() + var lastPing time.Time + + // More efficient than constantly resetting a timer w/ Stop+Reset + ticker := time.NewTicker(pingInterval) + defer ticker.Stop() - // FIXME: Race on "deadmanTimer", timer channel cannot be read concurrently while resetting. - // "This should not be done concurrent to other receives from the Timer's channel." - // https://pkg.go.dev/time#Timer.Reset - // See deadman.go line ~22. - case <-deadmanTimer.Elapsed(): - sendErr(errors.New("ping timeout: Slack did not send us WebSocket PING for more than Client.maxInterval")) + for { + select { + case <-done: + return - cancel() + case lastPing = <-pingChan: + // This case gets the time of the last ping. + // If this case never fires then the pingHandler was never called + // in which case lastPing is the zero time.Time value, and will 'fail' + // the next tick, causing us to exit. + + case now := <-ticker.C: + // Our last ping is older than our interval + if now.Sub(lastPing) > pingInterval { + sendErr(errors.New("ping timeout: Slack did not send us WebSocket PING for more than Client.maxInterval")) + + cancel() + return + } + } } - }() + }(smc.maxPingInterval) wg.Wait() @@ -294,12 +311,13 @@ func (smc *Client) openAndDial(ctx context.Context, additionalPingHandler func(s smc.Debugf("Failed to dial to the websocket: %s", err) return nil, nil, err } + if additionalPingHandler == nil { + additionalPingHandler = func(_ string) error { return nil } + } conn.SetPingHandler(func(appData string) error { - if additionalPingHandler != nil { - if err := additionalPingHandler(appData); err != nil { - return err - } + if err := additionalPingHandler(appData); err != nil { + return err } smc.handlePing(conn, appData)