Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change event shape to match MSC3898 #70

Merged
merged 10 commits into from
Dec 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,4 @@ require (
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
)

replace maunium.net/go/mautrix => github.com/matrix-org/mautrix-go v0.0.0-20220817142816-160ea900a20b
replace maunium.net/go/mautrix => github.com/matrix-org/mautrix-go v0.0.0-20221210135932-bd593dd0204b
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORN
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/matrix-org/mautrix-go v0.0.0-20220817142816-160ea900a20b h1:qKvyphdDykNjyF1vJLaVuWCPfNJWNzP7wHvMV5mw+Ss=
github.com/matrix-org/mautrix-go v0.0.0-20220817142816-160ea900a20b/go.mod h1:hHvNi5iKVAiI2MAdAeXHtP4g9BvNEX2rsQpSF/x6Kx4=
github.com/matrix-org/mautrix-go v0.0.0-20221210135932-bd593dd0204b h1:yMsRQmsBWm7wJurYwnyd7H7wZWawhp52ca62W3MqDA8=
github.com/matrix-org/mautrix-go v0.0.0-20221210135932-bd593dd0204b/go.mod h1:hHvNi5iKVAiI2MAdAeXHtP4g9BvNEX2rsQpSF/x6Kx4=
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
Expand Down
8 changes: 6 additions & 2 deletions pkg/conference/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@ package conference

// Configuration for the group conferences (calls).
type Config struct {
// Keep-alive timeout for WebRTC connections. If no keep-alive has been received
// from the client for this duration, the connection is considered dead (in seconds).
// Keep-alive timeout for WebRTC connections. If the client doesn't respond
// to an `m.call.ping` with an `m.call.pong` for this amount of time, the
// connection is considered dead. (in seconds, no greater then 30)
KeepAliveTimeout int `yaml:"timeout"`
// The time after which we should send another m.call.ping event to the
// client. (in seconds, greater then 30)
PingInterval int `yaml:"pingInterval"`
SimonBrandner marked this conversation as resolved.
Show resolved Hide resolved
SimonBrandner marked this conversation as resolved.
Show resolved Hide resolved
}
85 changes: 54 additions & 31 deletions pkg/conference/data_channel_message_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,32 @@ package conference

import (
"github.com/pion/webrtc/v3"
"github.com/sirupsen/logrus"
"golang.org/x/exp/slices"
"maunium.net/go/mautrix/event"
)

// Handle the `SFUMessage` event from the DataChannel message.
func (c *Conference) processSelectDCMessage(participant *Participant, msg event.SFUMessage) {
participant.logger.Info("Received select request over DC")
// Handle the `FocusEvent` from the DataChannel message.
func (c *Conference) processTrackSubscriptionDCMessage(
participant *Participant, msg event.FocusCallTrackSubscriptionEventContent,
) {
participant.logger.Info("Received track subscription request over DC")

// TODO: Handle unsubscribe

// Find tracks based on what we were asked for.
tracks := c.getTracks(msg.Start)
tracks := c.getTracks(msg.Subscribe)

participant.logger.WithFields(logrus.Fields{
"tracks_we_got": tracks,
"tracks_we_want": msg,
}).Debug("Tracks to subscribe to")

// Let's check if we have all the tracks that we were asked for are there.
// If not, we will list which are not available (later on we must inform participant
// about it unless the participant retries it).
if len(tracks) != len(msg.Start) {
for _, expected := range msg.Start {
if len(tracks) != len(msg.Subscribe) {
for _, expected := range msg.Subscribe {
found := slices.IndexFunc(tracks, func(track *webrtc.TrackLocalStaticRTP) bool {
return track.ID() == expected.TrackID
})
Expand All @@ -30,45 +40,58 @@ func (c *Conference) processSelectDCMessage(participant *Participant, msg event.

// Subscribe to the found tracks.
for _, track := range tracks {
participant.logger.WithField("track_id", track.ID()).Debug("Subscribing to track")
if err := participant.peer.SubscribeTo(track); err != nil {
participant.logger.Errorf("Failed to subscribe to track: %v", err)
return
}
}
}

func (c *Conference) processAnswerDCMessage(participant *Participant, msg event.SFUMessage) {
participant.logger.Info("Received SDP answer over DC")

if err := participant.peer.ProcessSDPAnswer(msg.SDP); err != nil {
participant.logger.Errorf("Failed to set SDP answer: %v", err)
return
}
}
func (c *Conference) processNegotiateDCMessage(participant *Participant, msg event.FocusCallNegotiateEventContent) {
participant.streamMetadata = msg.SDPStreamMetadata

func (c *Conference) processPublishDCMessage(participant *Participant, msg event.SFUMessage) {
participant.logger.Info("Received SDP offer over DC")
switch msg.Description.Type {
case event.CallDataTypeOffer:
participant.logger.WithField("SDP", msg.Description.SDP).Trace("Received SDP offer over DC")

answer, err := participant.peer.ProcessSDPOffer(msg.SDP)
if err != nil {
participant.logger.Errorf("Failed to set SDP offer: %v", err)
return
}
answer, err := participant.peer.ProcessSDPOffer(msg.Description.SDP)
if err != nil {
participant.logger.Errorf("Failed to set SDP offer: %v", err)
return
}

participant.streamMetadata = msg.Metadata
participant.sendDataChannelMessage(event.Event{
Type: event.FocusCallNegotiate,
Content: event.Content{
Parsed: event.FocusCallNegotiateEventContent{
Description: event.CallData{
Type: event.CallDataType(answer.Type.String()),
SDP: answer.SDP,
},
SDPStreamMetadata: c.getAvailableStreamsFor(participant.id),
},
},
})
case event.CallDataTypeAnswer:
participant.logger.WithField("SDP", msg.Description.SDP).Trace("Received SDP answer over DC")

participant.sendDataChannelMessage(event.SFUMessage{
Op: event.SFUOperationAnswer,
SDP: answer.SDP,
Metadata: c.getAvailableStreamsFor(participant.id),
})
if err := participant.peer.ProcessSDPAnswer(msg.Description.SDP); err != nil {
participant.logger.Errorf("Failed to set SDP answer: %v", err)
return
}
default:
participant.logger.Errorf("Unknown SDP description type")
}
}

func (c *Conference) processAliveDCMessage(participant *Participant) {
participant.peer.ProcessHeartbeat()
func (c *Conference) processPongDCMessage(participant *Participant) {
participant.peer.ProcessPong()
}

func (c *Conference) processMetadataDCMessage(participant *Participant, msg event.SFUMessage) {
participant.streamMetadata = msg.Metadata
func (c *Conference) processMetadataDCMessage(
participant *Participant, msg event.FocusCallSDPStreamMetadataChangedEventContent,
) {
participant.streamMetadata = msg.SDPStreamMetadata
c.resendMetadataToAllExcept(participant.id)
}
21 changes: 19 additions & 2 deletions pkg/conference/matrix_message_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,25 @@ func (c *Conference) onNewParticipant(participantID ParticipantID, inviteEvent *
} else {
messageSink := common.NewMessageSink(participantID, c.peerMessages)

keepAliveDeadline := time.Duration(c.config.KeepAliveTimeout) * time.Second
peer, answer, err := peer.NewPeer(inviteEvent.Offer.SDP, messageSink, logger, keepAliveDeadline)
peer, answer, err := peer.NewPeer(
inviteEvent.Offer.SDP,
messageSink,
logger,
peer.PingPongConfig{
Interval: time.Duration(c.config.PingInterval) * time.Second,
Deadline: time.Duration(c.config.KeepAliveTimeout) * time.Second,
PongChannel: make(chan peer.Pong, common.UnboundedChannelSize),
SendPing: func() {
participant.sendDataChannelMessage(event.Event{
Type: event.FocusCallPing,
Content: event.Content{},
})
},
OnDeadLine: func() {
messageSink.Send(peer.LeftTheCall{Reason: event.CallHangupKeepAliveTimeout})
},
},
)
if err != nil {
logger.WithError(err).Errorf("Failed to process SDP offer")
return err
Expand Down
5 changes: 2 additions & 3 deletions pkg/conference/participant.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package conference

import (
"encoding/json"
"time"

"github.com/matrix-org/waterfall/pkg/peer"
Expand Down Expand Up @@ -46,8 +45,8 @@ func (p *Participant) asMatrixRecipient() signaling.MatrixRecipient {
}
}

func (p *Participant) sendDataChannelMessage(toSend event.SFUMessage) {
jsonToSend, err := json.Marshal(toSend)
func (p *Participant) sendDataChannelMessage(toSend event.Event) {
jsonToSend, err := toSend.MarshalJSON()
if err != nil {
p.logger.Error("Failed to marshal data channel message")
return
Expand Down
58 changes: 37 additions & 21 deletions pkg/conference/peer_message_processor.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package conference

import (
"encoding/json"
"time"

"github.com/matrix-org/waterfall/pkg/peer"
Expand Down Expand Up @@ -68,40 +67,57 @@ func (c *Conference) processICEGatheringCompleteMessage(participant *Participant

func (c *Conference) processRenegotiationRequiredMessage(participant *Participant, msg peer.RenegotiationRequired) {
participant.logger.Info("Started renegotiation")
participant.sendDataChannelMessage(event.SFUMessage{
Op: event.SFUOperationOffer,
SDP: msg.Offer.SDP,
Metadata: c.getAvailableStreamsFor(participant.id),
participant.sendDataChannelMessage(event.Event{
Type: event.FocusCallNegotiate,
Content: event.Content{
Parsed: event.FocusCallNegotiateEventContent{
Description: event.CallData{
Type: event.CallDataType(msg.Offer.Type.String()),
SDP: msg.Offer.SDP,
},
SDPStreamMetadata: c.getAvailableStreamsFor(participant.id),
},
},
})
}

func (c *Conference) processDataChannelMessage(participant *Participant, msg peer.DataChannelMessage) {
participant.logger.Debug("Received data channel message")
var sfuMessage event.SFUMessage
if err := json.Unmarshal([]byte(msg.Message), &sfuMessage); err != nil {
var focusEvent event.Event
if err := focusEvent.UnmarshalJSON([]byte(msg.Message)); err != nil {
c.logger.Errorf("Failed to unmarshal SFU message: %v", err)
return
}

switch sfuMessage.Op {
case event.SFUOperationSelect:
c.processSelectDCMessage(participant, sfuMessage)
case event.SFUOperationAnswer:
c.processAnswerDCMessage(participant, sfuMessage)
case event.SFUOperationPublish, event.SFUOperationUnpublish:
c.processPublishDCMessage(participant, sfuMessage)
case event.SFUOperationAlive:
c.processAliveDCMessage(participant)
case event.SFUOperationMetadata:
c.processMetadataDCMessage(participant, sfuMessage)
// FIXME: We should be able to do
// focusEvent.Content.ParseRaw(focusEvent.Type) but it throws an error.
switch focusEvent.Type.Type {
case event.FocusCallTrackSubscription.Type:
focusEvent.Content.ParseRaw(event.FocusCallTrackSubscription)
SimonBrandner marked this conversation as resolved.
Show resolved Hide resolved
c.processTrackSubscriptionDCMessage(participant, *focusEvent.Content.AsFocusCallTrackSubscription())
case event.FocusCallNegotiate.Type:
focusEvent.Content.ParseRaw(event.FocusCallNegotiate)
c.processNegotiateDCMessage(participant, *focusEvent.Content.AsFocusCallNegotiate())
case event.FocusCallPong.Type:
focusEvent.Content.ParseRaw(event.FocusCallPong)
c.processPongDCMessage(participant)
case event.FocusCallSDPStreamMetadataChanged.Type:
focusEvent.Content.ParseRaw(event.FocusCallSDPStreamMetadataChanged)
c.processMetadataDCMessage(participant, *focusEvent.Content.AsFocusCallSDPStreamMetadataChanged())
default:
participant.logger.WithField("type", focusEvent.Type.Type).Warn("Received data channel message of unknown type")
}
}

func (c *Conference) processDataChannelAvailableMessage(participant *Participant, msg peer.DataChannelAvailable) {
participant.logger.Info("Connected data channel")
participant.sendDataChannelMessage(event.SFUMessage{
Op: event.SFUOperationMetadata,
Metadata: c.getAvailableStreamsFor(participant.id),
participant.sendDataChannelMessage(event.Event{
Type: event.FocusCallSDPStreamMetadataChanged,
Content: event.Content{
Parsed: event.FocusCallSDPStreamMetadataChangedEventContent{
SDPStreamMetadata: c.getAvailableStreamsFor(participant.id),
},
},
})
}

Expand Down
12 changes: 8 additions & 4 deletions pkg/conference/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func (c *Conference) getAvailableStreamsFor(forParticipant ParticipantID) event.
}

// Helper that returns the list of streams inside this conference that match the given stream IDs and track IDs.
func (c *Conference) getTracks(identifiers []event.SFUTrackDescription) []*webrtc.TrackLocalStaticRTP {
func (c *Conference) getTracks(identifiers []event.FocusTrackDescription) []*webrtc.TrackLocalStaticRTP {
tracks := make([]*webrtc.TrackLocalStaticRTP, 0)
for _, participant := range c.participants {
// Check if this participant has any of the tracks that we're looking for.
Expand All @@ -112,9 +112,13 @@ func (c *Conference) getTracks(identifiers []event.SFUTrackDescription) []*webrt
func (c *Conference) resendMetadataToAllExcept(exceptMe ParticipantID) {
for participantID, participant := range c.participants {
if participantID != exceptMe {
participant.sendDataChannelMessage(event.SFUMessage{
Op: event.SFUOperationMetadata,
Metadata: c.getAvailableStreamsFor(participantID),
participant.sendDataChannelMessage(event.Event{
Type: event.FocusCallSDPStreamMetadataChanged,
Content: event.Content{
Parsed: event.FocusCallSDPStreamMetadataChangedEventContent{
SDPStreamMetadata: c.getAvailableStreamsFor(participantID),
},
},
})
}
}
Expand Down
5 changes: 4 additions & 1 deletion pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,13 @@ func LoadConfigFromString(configString string) (*Config, error) {
return nil, fmt.Errorf("failed to unmarshal YAML file: %w", err)
}

// TODO: We should split these up and add error messages
if config.Matrix.UserID == "" ||
config.Matrix.HomeserverURL == "" ||
config.Matrix.AccessToken == "" ||
config.Conference.KeepAliveTimeout == 0 {
config.Conference.KeepAliveTimeout == 0 ||
config.Conference.KeepAliveTimeout > 30 ||
config.Conference.PingInterval < 30 {
dbkr marked this conversation as resolved.
Show resolved Hide resolved
return nil, errors.New("invalid config values")
}

Expand Down
23 changes: 0 additions & 23 deletions pkg/peer/keepalive.go

This file was deleted.

Loading