Skip to content

Commit

Permalink
Improve chat interoperability
Browse files Browse the repository at this point in the history
  • Loading branch information
mengelbart committed Mar 19, 2024
1 parent aa3dbc9 commit 2dfc6a9
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 12 deletions.
18 changes: 13 additions & 5 deletions examples/chat/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ func NewClient(p *moqtransport.Session) (*Client, error) {
}
go func() {
for {
var a *moqtransport.Announcement
a, err := c.session.ReadAnnouncement(context.Background())
if err != nil {
panic(err)
Expand All @@ -96,7 +95,7 @@ func NewClient(p *moqtransport.Session) (*Client, error) {
if err != nil {
panic(err)
}
parts := strings.SplitN(s.Namespace(), "/", 2)
parts := strings.SplitN(s.Namespace(), "/", 4)
if len(parts) < 2 {
s.Reject(moqtransport.SubscribeErrorUnknownTrack, "invalid trackname")
continue
Expand All @@ -108,9 +107,12 @@ func NewClient(p *moqtransport.Session) (*Client, error) {
}
if _, ok := c.rooms[id]; !ok {
s.Reject(moqtransport.SubscribeErrorUnknownTrack, "invalid subscribe request")
log.Printf("got subscribe request for unknown room: %v", id)
continue
}
s.Accept()
// TODO: Should add to a list instead of overwriting or at least end
// previous subscriptions?
c.rooms[id].st = s
}
}()
Expand All @@ -128,11 +130,12 @@ func (c *Client) handleCatalogDeltas(roomID, username string, catalogTrack *moqt
if err != nil {
return err
}
log.Printf("got catalog delta: %v", delta)
for _, p := range delta.joined {
if p == username {
continue
}
t, err := c.session.Subscribe(context.Background(), 0, 0, fmt.Sprintf("moq-chat/%v", roomID), p, username)
t, err := c.session.Subscribe(context.Background(), 2, 0, fmt.Sprintf("moq-chat/%v/participant/%v", roomID, p), "", username)
if err != nil {
return err
}
Expand Down Expand Up @@ -178,11 +181,12 @@ func (c *Client) joinRoom(roomID, username string) error {
if err != nil {
return err
}
log.Printf("got catalog: %v", participants)
for p := range participants.participants {
if p == username {
continue
}
t, err := c.session.Subscribe(context.Background(), 2, 0, fmt.Sprintf("moq-chat/%v", roomID), p, username)
t, err := c.session.Subscribe(context.Background(), 2, 0, fmt.Sprintf("moq-chat/%v/participant/%v", roomID, p), "", username)
if err != nil {
log.Fatalf("failed to subscribe to participant track: %v", err)
}
Expand Down Expand Up @@ -231,9 +235,13 @@ func (c *Client) Run() error {
fmt.Println("invalid msg command, usage: 'msg <room id> <msg>'")
continue
}
if c.rooms[fields[1]].st == nil {
fmt.Println("server not subscribed, dropping message")
continue
}
w, err := c.rooms[fields[1]].st.NewObjectStream(0, 0, 0) // TODO
if err != nil {
fmt.Printf("failed to send object: %v", err)
fmt.Printf("failed to send object: %v\n", err)
continue
}
if _, err = w.Write([]byte(strings.TrimSpace(msg))); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion examples/chat/room.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (r *room) join(username string, p *moqtransport.Session) error {
if _, ok := r.publishers[username]; ok {
return errors.New("username already taken")
}
t, err := p.Subscribe(context.Background(), 0, 0, fmt.Sprintf("moq-chat/%v", r.id), username, "")
t, err := p.Subscribe(context.Background(), 0, 0, fmt.Sprintf("moq-chat/%v/participant/%v", r.id, username), "", "")
if err != nil {
return err
}
Expand Down
17 changes: 11 additions & 6 deletions examples/chat/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,13 +137,13 @@ func (s *Server) handle(p *moqtransport.Session) {
if err != nil {
panic(err)
}
uri := strings.SplitN(a.Namespace(), "/", 3)
if len(uri) < 3 {
uri := strings.SplitN(a.Namespace(), "/", 4)
if len(uri) < 4 {
a.Reject(0, "invalid announcement")
continue
}
moq_chat, id, username := uri[0], uri[1], uri[2]
if moq_chat != "moq-chat" {
moq_chat, id, participant, username := uri[0], uri[1], uri[2], uri[3]
if moq_chat != "moq-chat" || participant != "participant" {
a.Reject(0, "invalid moq-chat namespace")
continue
}
Expand All @@ -170,7 +170,7 @@ func (s *Server) handle(p *moqtransport.Session) {
sub.Reject(moqtransport.SubscribeErrorUnknownTrack, "subscribe without prior announcement")
continue
}
parts := strings.SplitN(sub.Namespace(), "/", 2)
parts := strings.SplitN(sub.Namespace(), "/", 4)
if len(parts) < 2 {
sub.Reject(moqtransport.SubscribeErrorUnknownTrack, "invalid trackname")
continue
Expand Down Expand Up @@ -198,7 +198,12 @@ func (s *Server) handle(p *moqtransport.Session) {

r.lock.Lock()
log.Printf("subscribing user %v to publisher %v", name, sub.Trackname())
r.publishers[sub.Trackname()].subscribe(name, sub)
if len(parts) < 4 {
sub.Reject(0, "invalid subscriptions namespace, expected 'moq-chat/<room-id>/participants/<username>")
continue
}
username := parts[3]
r.publishers[username].subscribe(name, sub)
r.lock.Unlock()
}
}()
Expand Down

0 comments on commit 2dfc6a9

Please sign in to comment.