From 2dfc6a90f3ca9d028ea6fed5efba8b54e8fb36fc Mon Sep 17 00:00:00 2001 From: Mathis Engelbart Date: Tue, 19 Mar 2024 22:27:30 +1000 Subject: [PATCH] Improve chat interoperability --- examples/chat/client.go | 18 +++++++++++++----- examples/chat/room.go | 2 +- examples/chat/server.go | 17 +++++++++++------ 3 files changed, 25 insertions(+), 12 deletions(-) diff --git a/examples/chat/client.go b/examples/chat/client.go index 635109a..deb84a0 100644 --- a/examples/chat/client.go +++ b/examples/chat/client.go @@ -81,7 +81,6 @@ func NewClient(p *moqtransport.Session) (*Client, error) { } go func() { for { - var a *moqtransport.Announcement a, err := c.session.ReadAnnouncement(context.Background()) if err != nil { panic(err) @@ -96,7 +95,7 @@ func NewClient(p *moqtransport.Session) (*Client, error) { if err != nil { panic(err) } - parts := strings.SplitN(s.Namespace(), "/", 2) + parts := strings.SplitN(s.Namespace(), "/", 4) if len(parts) < 2 { s.Reject(moqtransport.SubscribeErrorUnknownTrack, "invalid trackname") continue @@ -108,9 +107,12 @@ func NewClient(p *moqtransport.Session) (*Client, error) { } if _, ok := c.rooms[id]; !ok { s.Reject(moqtransport.SubscribeErrorUnknownTrack, "invalid subscribe request") + log.Printf("got subscribe request for unknown room: %v", id) continue } s.Accept() + // TODO: Should add to a list instead of overwriting or at least end + // previous subscriptions? c.rooms[id].st = s } }() @@ -128,11 +130,12 @@ func (c *Client) handleCatalogDeltas(roomID, username string, catalogTrack *moqt if err != nil { return err } + log.Printf("got catalog delta: %v", delta) for _, p := range delta.joined { if p == username { continue } - t, err := c.session.Subscribe(context.Background(), 0, 0, fmt.Sprintf("moq-chat/%v", roomID), p, username) + t, err := c.session.Subscribe(context.Background(), 2, 0, fmt.Sprintf("moq-chat/%v/participant/%v", roomID, p), "", username) if err != nil { return err } @@ -178,11 +181,12 @@ func (c *Client) joinRoom(roomID, username string) error { if err != nil { return err } + log.Printf("got catalog: %v", participants) for p := range participants.participants { if p == username { continue } - t, err := c.session.Subscribe(context.Background(), 2, 0, fmt.Sprintf("moq-chat/%v", roomID), p, username) + t, err := c.session.Subscribe(context.Background(), 2, 0, fmt.Sprintf("moq-chat/%v/participant/%v", roomID, p), "", username) if err != nil { log.Fatalf("failed to subscribe to participant track: %v", err) } @@ -231,9 +235,13 @@ func (c *Client) Run() error { fmt.Println("invalid msg command, usage: 'msg '") continue } + if c.rooms[fields[1]].st == nil { + fmt.Println("server not subscribed, dropping message") + continue + } w, err := c.rooms[fields[1]].st.NewObjectStream(0, 0, 0) // TODO if err != nil { - fmt.Printf("failed to send object: %v", err) + fmt.Printf("failed to send object: %v\n", err) continue } if _, err = w.Write([]byte(strings.TrimSpace(msg))); err != nil { diff --git a/examples/chat/room.go b/examples/chat/room.go index 6e93383..0abc1fa 100644 --- a/examples/chat/room.go +++ b/examples/chat/room.go @@ -48,7 +48,7 @@ func (r *room) join(username string, p *moqtransport.Session) error { if _, ok := r.publishers[username]; ok { return errors.New("username already taken") } - t, err := p.Subscribe(context.Background(), 0, 0, fmt.Sprintf("moq-chat/%v", r.id), username, "") + t, err := p.Subscribe(context.Background(), 0, 0, fmt.Sprintf("moq-chat/%v/participant/%v", r.id, username), "", "") if err != nil { return err } diff --git a/examples/chat/server.go b/examples/chat/server.go index 5106fec..96305c1 100644 --- a/examples/chat/server.go +++ b/examples/chat/server.go @@ -137,13 +137,13 @@ func (s *Server) handle(p *moqtransport.Session) { if err != nil { panic(err) } - uri := strings.SplitN(a.Namespace(), "/", 3) - if len(uri) < 3 { + uri := strings.SplitN(a.Namespace(), "/", 4) + if len(uri) < 4 { a.Reject(0, "invalid announcement") continue } - moq_chat, id, username := uri[0], uri[1], uri[2] - if moq_chat != "moq-chat" { + moq_chat, id, participant, username := uri[0], uri[1], uri[2], uri[3] + if moq_chat != "moq-chat" || participant != "participant" { a.Reject(0, "invalid moq-chat namespace") continue } @@ -170,7 +170,7 @@ func (s *Server) handle(p *moqtransport.Session) { sub.Reject(moqtransport.SubscribeErrorUnknownTrack, "subscribe without prior announcement") continue } - parts := strings.SplitN(sub.Namespace(), "/", 2) + parts := strings.SplitN(sub.Namespace(), "/", 4) if len(parts) < 2 { sub.Reject(moqtransport.SubscribeErrorUnknownTrack, "invalid trackname") continue @@ -198,7 +198,12 @@ func (s *Server) handle(p *moqtransport.Session) { r.lock.Lock() log.Printf("subscribing user %v to publisher %v", name, sub.Trackname()) - r.publishers[sub.Trackname()].subscribe(name, sub) + if len(parts) < 4 { + sub.Reject(0, "invalid subscriptions namespace, expected 'moq-chat//participants/") + continue + } + username := parts[3] + r.publishers[username].subscribe(name, sub) r.lock.Unlock() } }()