Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change event shape to match MSC3898 #70

Merged
merged 10 commits into from
Dec 12, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,4 @@ require (
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
)

replace maunium.net/go/mautrix => github.com/matrix-org/mautrix-go v0.0.0-20220817142816-160ea900a20b
replace maunium.net/go/mautrix => github.com/matrix-org/mautrix-go v0.0.0-20221210135932-bd593dd0204b
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORN
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/matrix-org/mautrix-go v0.0.0-20220817142816-160ea900a20b h1:qKvyphdDykNjyF1vJLaVuWCPfNJWNzP7wHvMV5mw+Ss=
github.com/matrix-org/mautrix-go v0.0.0-20220817142816-160ea900a20b/go.mod h1:hHvNi5iKVAiI2MAdAeXHtP4g9BvNEX2rsQpSF/x6Kx4=
github.com/matrix-org/mautrix-go v0.0.0-20221210135932-bd593dd0204b h1:yMsRQmsBWm7wJurYwnyd7H7wZWawhp52ca62W3MqDA8=
github.com/matrix-org/mautrix-go v0.0.0-20221210135932-bd593dd0204b/go.mod h1:hHvNi5iKVAiI2MAdAeXHtP4g9BvNEX2rsQpSF/x6Kx4=
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
Expand Down
8 changes: 6 additions & 2 deletions pkg/conference/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@ package conference

// Configuration for the group conferences (calls).
type Config struct {
// Keep-alive timeout for WebRTC connections. If no keep-alive has been received
// from the client for this duration, the connection is considered dead (in seconds).
// Keep-alive timeout for WebRTC connections. If the client doesn't respond
// to an `m.call.ping` with an `m.call.pong` for this amount of time, the
// connection is considered dead. (in seconds, no greater then 30)
KeepAliveTimeout int `yaml:"timeout"`
// The time after which we should send another m.call.ping event to the
// client. (in seconds, greater then 30)
PingInterval int `yaml:"pingInterval"`
SimonBrandner marked this conversation as resolved.
Show resolved Hide resolved
SimonBrandner marked this conversation as resolved.
Show resolved Hide resolved
}
86 changes: 54 additions & 32 deletions pkg/conference/data_channel_message_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,30 @@ package conference

import (
"github.com/pion/webrtc/v3"
"github.com/sirupsen/logrus"
"golang.org/x/exp/slices"
"maunium.net/go/mautrix/event"
)

// Handle the `SFUMessage` event from the DataChannel message.
func (c *Conference) processSelectDCMessage(participant *Participant, msg event.SFUMessage) {
participant.logger.Info("Received select request over DC")
// Handle the `FocusEvent` from the DataChannel message.
func (c *Conference) processTrackSubscriptionDCMessage(
participant *Participant, msg event.FocusCallTrackSubscriptionEventContent,
) {
participant.logger.Info("Received track subscription request over DC")

// Find tracks based on what we were asked for.
tracks := c.getTracks(msg.Start)
tracks := c.getTracks(msg.Subscribe)

participant.logger.WithFields(logrus.Fields{
"tracks_we_got": tracks,
"tracks_we_want": msg,
}).Debug("Tracks to subscribe to")

// Let's check if we have all the tracks that we were asked for are there.
// If not, we will list which are not available (later on we must inform participant
// about it unless the participant retries it).
if len(tracks) != len(msg.Start) {
for _, expected := range msg.Start {
if len(tracks) != len(msg.Subscribe) {
for _, expected := range msg.Subscribe {
found := slices.IndexFunc(tracks, func(track *webrtc.TrackLocalStaticRTP) bool {
return track.ID() == expected.TrackID
})
Expand All @@ -28,47 +36,61 @@ func (c *Conference) processSelectDCMessage(participant *Participant, msg event.
}
}

// Subscribe to the found tracks.
// Subscribe to the found tracks
SimonBrandner marked this conversation as resolved.
Show resolved Hide resolved
for _, track := range tracks {
participant.logger.WithField("track_id", track.ID()).Debug("Subscribing to track")
if err := participant.peer.SubscribeTo(track); err != nil {
participant.logger.Errorf("Failed to subscribe to track: %v", err)
return
}
}
}

func (c *Conference) processAnswerDCMessage(participant *Participant, msg event.SFUMessage) {
participant.logger.Info("Received SDP answer over DC")

if err := participant.peer.ProcessSDPAnswer(msg.SDP); err != nil {
participant.logger.Errorf("Failed to set SDP answer: %v", err)
return
}
// TODO: Handle unsubscribe
SimonBrandner marked this conversation as resolved.
Show resolved Hide resolved
}

func (c *Conference) processPublishDCMessage(participant *Participant, msg event.SFUMessage) {
participant.logger.Info("Received SDP offer over DC")
func (c *Conference) processNegotiateDCMessage(participant *Participant, msg event.FocusCallNegotiateEventContent) {
participant.streamMetadata = msg.SDPStreamMetadata

answer, err := participant.peer.ProcessSDPOffer(msg.SDP)
if err != nil {
participant.logger.Errorf("Failed to set SDP offer: %v", err)
return
}
if msg.Description.Type == event.CallDataTypeOffer {
SimonBrandner marked this conversation as resolved.
Show resolved Hide resolved
participant.logger.Info("Received SDP offer over DC")
SimonBrandner marked this conversation as resolved.
Show resolved Hide resolved

participant.streamMetadata = msg.Metadata
answer, err := participant.peer.ProcessSDPOffer(msg.Description.SDP)
if err != nil {
participant.logger.Errorf("Failed to set SDP offer: %v", err)
return
}

participant.sendDataChannelMessage(event.Event{
Type: event.FocusCallNegotiate,
Content: event.Content{
Parsed: event.FocusCallNegotiateEventContent{
Description: event.CallData{
Type: event.CallDataType(answer.Type.String()),
SDP: answer.SDP,
},
SDPStreamMetadata: c.getAvailableStreamsFor(participant.id),
},
},
})
} else if msg.Description.Type == event.CallDataTypeAnswer {
participant.logger.Info("Received SDP answer over DC")

participant.sendDataChannelMessage(event.SFUMessage{
Op: event.SFUOperationAnswer,
SDP: answer.SDP,
Metadata: c.getAvailableStreamsFor(participant.id),
})
if err := participant.peer.ProcessSDPAnswer(msg.Description.SDP); err != nil {
participant.logger.Errorf("Failed to set SDP answer: %v", err)
return
}
} else {
participant.logger.Errorf("Unknown SDP description type")
}
}

func (c *Conference) processAliveDCMessage(participant *Participant) {
participant.peer.ProcessHeartbeat()
func (c *Conference) processPongDCMessage(participant *Participant) {
participant.peer.ProcessPong()
}

func (c *Conference) processMetadataDCMessage(participant *Participant, msg event.SFUMessage) {
participant.streamMetadata = msg.Metadata
func (c *Conference) processMetadataDCMessage(
participant *Participant, msg event.FocusCallSDPStreamMetadataChangedEventContent,
) {
participant.streamMetadata = msg.SDPStreamMetadata
c.resendMetadataToAllExcept(participant.id)
}
16 changes: 15 additions & 1 deletion pkg/conference/matrix_message_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,21 @@ func (c *Conference) onNewParticipant(participantID ParticipantID, inviteEvent *
messageSink := common.NewMessageSink(participantID, c.peerMessages)

keepAliveDeadline := time.Duration(c.config.KeepAliveTimeout) * time.Second
peer, answer, err := peer.NewPeer(inviteEvent.Offer.SDP, messageSink, logger, keepAliveDeadline)
pingInterval := time.Duration(c.config.PingInterval) * time.Second
sendPing := func() {
participant.sendDataChannelMessage(event.Event{
Type: event.FocusCallPing,
Content: event.Content{},
})
}
peer, answer, err := peer.NewPeer(
inviteEvent.Offer.SDP,
messageSink,
logger,
pingInterval,
keepAliveDeadline,
sendPing,
SimonBrandner marked this conversation as resolved.
Show resolved Hide resolved
)
if err != nil {
logger.WithError(err).Errorf("Failed to process SDP offer")
return err
Expand Down
5 changes: 2 additions & 3 deletions pkg/conference/participant.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package conference

import (
"encoding/json"
"time"

"github.com/matrix-org/waterfall/pkg/peer"
Expand Down Expand Up @@ -46,8 +45,8 @@ func (p *Participant) asMatrixRecipient() signaling.MatrixRecipient {
}
}

func (p *Participant) sendDataChannelMessage(toSend event.SFUMessage) {
jsonToSend, err := json.Marshal(toSend)
func (p *Participant) sendDataChannelMessage(toSend event.Event) {
jsonToSend, err := toSend.MarshalJSON()
if err != nil {
p.logger.Error("Failed to marshal data channel message")
return
Expand Down
56 changes: 35 additions & 21 deletions pkg/conference/peer_message_processor.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package conference

import (
"encoding/json"
"time"

"github.com/matrix-org/waterfall/pkg/peer"
Expand Down Expand Up @@ -68,40 +67,55 @@ func (c *Conference) processICEGatheringCompleteMessage(participant *Participant

func (c *Conference) processRenegotiationRequiredMessage(participant *Participant, msg peer.RenegotiationRequired) {
participant.logger.Info("Started renegotiation")
participant.sendDataChannelMessage(event.SFUMessage{
Op: event.SFUOperationOffer,
SDP: msg.Offer.SDP,
Metadata: c.getAvailableStreamsFor(participant.id),
participant.sendDataChannelMessage(event.Event{
Type: event.FocusCallNegotiate,
Content: event.Content{
Parsed: event.FocusCallNegotiateEventContent{
Description: event.CallData{
Type: event.CallDataType(msg.Offer.Type.String()),
SDP: msg.Offer.SDP,
},
SDPStreamMetadata: c.getAvailableStreamsFor(participant.id),
},
},
})
}

func (c *Conference) processDataChannelMessage(participant *Participant, msg peer.DataChannelMessage) {
participant.logger.Debug("Received data channel message")
var sfuMessage event.SFUMessage
if err := json.Unmarshal([]byte(msg.Message), &sfuMessage); err != nil {
var focusEvent event.Event
if err := focusEvent.UnmarshalJSON([]byte(msg.Message)); err != nil {
c.logger.Errorf("Failed to unmarshal SFU message: %v", err)
return
}

switch sfuMessage.Op {
case event.SFUOperationSelect:
c.processSelectDCMessage(participant, sfuMessage)
case event.SFUOperationAnswer:
c.processAnswerDCMessage(participant, sfuMessage)
case event.SFUOperationPublish, event.SFUOperationUnpublish:
c.processPublishDCMessage(participant, sfuMessage)
case event.SFUOperationAlive:
c.processAliveDCMessage(participant)
case event.SFUOperationMetadata:
c.processMetadataDCMessage(participant, sfuMessage)
switch focusEvent.Type.Type {
case event.FocusCallTrackSubscription.Type:
focusEvent.Content.ParseRaw(event.FocusCallTrackSubscription)
SimonBrandner marked this conversation as resolved.
Show resolved Hide resolved
c.processTrackSubscriptionDCMessage(participant, *focusEvent.Content.AsFocusCallTrackSubscription())
case event.FocusCallNegotiate.Type:
focusEvent.Content.ParseRaw(event.FocusCallNegotiate)
c.processNegotiateDCMessage(participant, *focusEvent.Content.AsFocusCallNegotiate())
case event.FocusCallPong.Type:
focusEvent.Content.ParseRaw(event.FocusCallPong)
c.processPongDCMessage(participant)
case event.FocusCallSDPStreamMetadataChanged.Type:
focusEvent.Content.ParseRaw(event.FocusCallSDPStreamMetadataChanged)
c.processMetadataDCMessage(participant, *focusEvent.Content.AsFocusCallSDPStreamMetadataChanged())
default:
participant.logger.WithField("type", focusEvent.Type.Type).Warn("Received data channel message of unknown type")
}
}

func (c *Conference) processDataChannelAvailableMessage(participant *Participant, msg peer.DataChannelAvailable) {
participant.logger.Info("Connected data channel")
participant.sendDataChannelMessage(event.SFUMessage{
Op: event.SFUOperationMetadata,
Metadata: c.getAvailableStreamsFor(participant.id),
participant.sendDataChannelMessage(event.Event{
Type: event.FocusCallSDPStreamMetadataChanged,
Content: event.Content{
Parsed: event.FocusCallSDPStreamMetadataChangedEventContent{
SDPStreamMetadata: c.getAvailableStreamsFor(participant.id),
},
},
})
}

Expand Down
12 changes: 8 additions & 4 deletions pkg/conference/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func (c *Conference) getAvailableStreamsFor(forParticipant ParticipantID) event.
}

// Helper that returns the list of streams inside this conference that match the given stream IDs and track IDs.
func (c *Conference) getTracks(identifiers []event.SFUTrackDescription) []*webrtc.TrackLocalStaticRTP {
func (c *Conference) getTracks(identifiers []event.FocusTrackDescription) []*webrtc.TrackLocalStaticRTP {
tracks := make([]*webrtc.TrackLocalStaticRTP, 0)
for _, participant := range c.participants {
// Check if this participant has any of the tracks that we're looking for.
Expand All @@ -112,9 +112,13 @@ func (c *Conference) getTracks(identifiers []event.SFUTrackDescription) []*webrt
func (c *Conference) resendMetadataToAllExcept(exceptMe ParticipantID) {
for participantID, participant := range c.participants {
if participantID != exceptMe {
participant.sendDataChannelMessage(event.SFUMessage{
Op: event.SFUOperationMetadata,
Metadata: c.getAvailableStreamsFor(participantID),
participant.sendDataChannelMessage(event.Event{
Type: event.FocusCallSDPStreamMetadataChanged,
Content: event.Content{
Parsed: event.FocusCallSDPStreamMetadataChangedEventContent{
SDPStreamMetadata: c.getAvailableStreamsFor(participantID),
},
},
})
}
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,9 @@ func LoadConfigFromString(configString string) (*Config, error) {
if config.Matrix.UserID == "" ||
config.Matrix.HomeserverURL == "" ||
config.Matrix.AccessToken == "" ||
config.Conference.KeepAliveTimeout == 0 {
config.Conference.KeepAliveTimeout == 0 ||
config.Conference.KeepAliveTimeout > 30 ||
config.Conference.PingInterval < 30 {
dbkr marked this conversation as resolved.
Show resolved Hide resolved
return nil, errors.New("invalid config values")
}

Expand Down
16 changes: 12 additions & 4 deletions pkg/peer/keepalive.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,26 @@ package peer

import "time"

type HeartBeat struct{}
type Pong struct{}

// Starts a goroutine that will execute `onDeadLine` closure in case nothing has been published
// to the `heartBeat` channel for `deadline` duration. The goroutine stops once the channel is closed.
func startKeepAlive(deadline time.Duration, heartBeat <-chan HeartBeat, onDeadLine func()) {
func startKeepAlive(
interval time.Duration,
deadline time.Duration,
pong <-chan Pong,
sendPing func(),
onDeadLine func(),
) {
go func() {
for {
for range time.Tick(interval) {
sendPing()

select {
case <-time.After(deadline):
onDeadLine()
return
case _, ok := <-heartBeat:
case _, ok := <-pong:
if !ok {
return
}
Expand Down
Loading