Skip to content

Commit

Permalink
Simplify how we handle ping and pong
Browse files Browse the repository at this point in the history
Signed-off-by: Šimon Brandner <simon.bra.ag@gmail.com>
  • Loading branch information
SimonBrandner committed Dec 12, 2022
1 parent 098be79 commit 90ae985
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 72 deletions.
29 changes: 14 additions & 15 deletions pkg/conference/matrix_message_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,25 +52,24 @@ func (c *Conference) onNewParticipant(participantID ParticipantID, inviteEvent *
} else {
messageSink := common.NewMessageSink(participantID, c.peerMessages)

pingInterval := time.Duration(c.config.PingInterval) * time.Second
keepAliveDeadline := time.Duration(c.config.KeepAliveTimeout) * time.Second
sendPing := func() {
participant.sendDataChannelMessage(event.Event{
Type: event.FocusCallPing,
Content: event.Content{},
})
}
onDeadLine := func() {
messageSink.Send(peer.LeftTheCall{Reason: event.CallHangupKeepAliveTimeout})
}
peer, answer, err := peer.NewPeer(
inviteEvent.Offer.SDP,
messageSink,
logger,
pingInterval,
keepAliveDeadline,
sendPing,
onDeadLine,
peer.PingPongConfig{
Interval: time.Duration(c.config.PingInterval) * time.Second,
Deadline: time.Duration(c.config.KeepAliveTimeout) * time.Second,
PongChannel: make(chan peer.Pong, common.UnboundedChannelSize),
SendPing: func() {
participant.sendDataChannelMessage(event.Event{
Type: event.FocusCallPing,
Content: event.Content{},
})
},
OnDeadLine: func() {
messageSink.Send(peer.LeftTheCall{Reason: event.CallHangupKeepAliveTimeout})
},
},
)
if err != nil {
logger.WithError(err).Errorf("Failed to process SDP offer")
Expand Down
31 changes: 0 additions & 31 deletions pkg/peer/keepalive.go

This file was deleted.

23 changes: 5 additions & 18 deletions pkg/peer/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"errors"
"io"
"sync"
"time"

"github.com/matrix-org/waterfall/pkg/common"
"github.com/pion/rtcp"
Expand Down Expand Up @@ -33,12 +32,7 @@ type Peer[ID comparable] struct {
logger *logrus.Entry
peerConnection *webrtc.PeerConnection
sink *common.MessageSink[ID, MessageContent]

pong chan Pong
sendPing func()
onDeadLine func()
pingInterval time.Duration
keepAliveDeadline time.Duration
pingPongConfig PingPongConfig

dataChannelMutex sync.Mutex
dataChannel *webrtc.DataChannel
Expand All @@ -49,10 +43,7 @@ func NewPeer[ID comparable](
sdpOffer string,
sink *common.MessageSink[ID, MessageContent],
logger *logrus.Entry,
pingInterval time.Duration,
keepAliveDeadline time.Duration,
sendPing func(),
onDeadLine func(),
pingPongConfig PingPongConfig,
) (*Peer[ID], *webrtc.SessionDescription, error) {
peerConnection, err := webrtc.NewPeerConnection(webrtc.Configuration{})
if err != nil {
Expand All @@ -64,12 +55,7 @@ func NewPeer[ID comparable](
logger: logger,
peerConnection: peerConnection,
sink: sink,

pong: make(chan Pong, common.UnboundedChannelSize),
pingInterval: pingInterval,
keepAliveDeadline: keepAliveDeadline,
sendPing: sendPing,
onDeadLine: onDeadLine,
pingPongConfig: pingPongConfig,
}

peerConnection.OnTrack(peer.onRtpTrackReceived)
Expand All @@ -84,6 +70,7 @@ func NewPeer[ID comparable](
if sdpAnswer, err := peer.ProcessSDPOffer(sdpOffer); err != nil {
return nil, nil, err
} else {
startPingPong(pingPongConfig)
return peer, sdpAnswer, nil
}
}
Expand Down Expand Up @@ -263,5 +250,5 @@ func (p *Peer[ID]) ProcessSDPOffer(sdpOffer string) (*webrtc.SessionDescription,
// We need to update the last heartbeat time. If the peer is not active for too long, we will
// consider peer's connection as stalled and will close it.
func (p *Peer[ID]) ProcessPong() {
p.pong <- Pong{}
p.pingPongConfig.PongChannel <- Pong{}
}
33 changes: 33 additions & 0 deletions pkg/peer/ping_pong.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package peer

import "time"

type Pong struct{}

type PingPongConfig struct {
Interval time.Duration
Deadline time.Duration
PongChannel chan Pong
SendPing func()
OnDeadLine func()
}

// Starts a goroutine that will execute `onDeadLine` closure in case nothing has been published
// to the `heartBeat` channel for `deadline` duration. The goroutine stops once the channel is closed.
func startPingPong(config PingPongConfig) {
go func() {
for range time.Tick(config.Interval) {
config.SendPing()

select {
case <-time.After(config.Deadline):
config.OnDeadLine()
return
case _, ok := <-config.PongChannel:
if !ok {
return
}
}
}
}()
}
8 changes: 0 additions & 8 deletions pkg/peer/webrtc.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,14 +133,6 @@ func (p *Peer[ID]) onDataChannelReady(dc *webrtc.DataChannel) {
dc.OnOpen(func() {
p.logger.Info("Data channel opened")
p.sink.Send(DataChannelAvailable{})

startKeepAlive(
p.pingInterval,
p.keepAliveDeadline,
p.pong,
p.sendPing,
p.onDeadLine,
)
})

dc.OnMessage(func(msg webrtc.DataChannelMessage) {
Expand Down

0 comments on commit 90ae985

Please sign in to comment.