Skip to content

Commit

Permalink
server: allow WritePacket() to return an error
Browse files Browse the repository at this point in the history
  • Loading branch information
aler9 committed Aug 14, 2023
1 parent 18ddae4 commit b475c18
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 36 deletions.
58 changes: 35 additions & 23 deletions server_play_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -568,9 +568,11 @@ func TestServerPlay(t *testing.T) {
// with StatusOK; therefore we must wait before calling
// ServerStream.WritePacket*()
go func() {
time.Sleep(1 * time.Second)
stream.WritePacketRTCP(stream.Medias()[0], &testRTCPPacket)
stream.WritePacketRTP(stream.Medias()[0], &testRTPPacket)
time.Sleep(500 * time.Millisecond)
err := stream.WritePacketRTCP(stream.Medias()[0], &testRTCPPacket)
require.NoError(t, err)
err = stream.WritePacketRTP(stream.Medias()[0], &testRTPPacket)
require.NoError(t, err)
}()

return &base.Response{
Expand Down Expand Up @@ -1042,14 +1044,15 @@ func TestServerPlayRTCPReport(t *testing.T) {
doPlay(t, conn, "rtsp://localhost:8554/teststream", session)

for i := 0; i < 2; i++ {
stream.WritePacketRTP(stream.Medias()[0], &rtp.Packet{
err := stream.WritePacketRTP(stream.Medias()[0], &rtp.Packet{
Header: rtp.Header{
Version: 2,
PayloadType: 96,
SSRC: 0x38F27A2F,
},
Payload: []byte{0x05}, // IDR
})
require.NoError(t, err)
}

var buf []byte
Expand Down Expand Up @@ -1161,15 +1164,17 @@ func TestServerPlayTCPResponseBeforeFrames(t *testing.T) {
go func() {
defer close(writerDone)

stream.WritePacketRTP(stream.Medias()[0], &testRTPPacket)
err := stream.WritePacketRTP(stream.Medias()[0], &testRTPPacket)
require.NoError(t, err)

t := time.NewTicker(50 * time.Millisecond)
defer t.Stop()
ti := time.NewTicker(50 * time.Millisecond)
defer ti.Stop()

for {
select {
case <-t.C:
stream.WritePacketRTP(stream.Medias()[0], &testRTPPacket)
case <-ti.C:
err := stream.WritePacketRTP(stream.Medias()[0], &testRTPPacket)
require.NoError(t, err)
case <-writerTerminate:
return
}
Expand Down Expand Up @@ -1308,13 +1313,14 @@ func TestServerPlayPlayPausePlay(t *testing.T) {
go func() {
defer close(writerDone)

t := time.NewTicker(50 * time.Millisecond)
defer t.Stop()
ti := time.NewTicker(50 * time.Millisecond)
defer ti.Stop()

for {
select {
case <-t.C:
stream.WritePacketRTP(stream.Medias()[0], &testRTPPacket)
case <-ti.C:
err := stream.WritePacketRTP(stream.Medias()[0], &testRTPPacket)
require.NoError(t, err)
case <-writerTerminate:
return
}
Expand Down Expand Up @@ -1395,13 +1401,14 @@ func TestServerPlayPlayPausePause(t *testing.T) {
go func() {
defer close(writerDone)

t := time.NewTicker(50 * time.Millisecond)
defer t.Stop()
ti := time.NewTicker(50 * time.Millisecond)
defer ti.Stop()

for {
select {
case <-t.C:
stream.WritePacketRTP(stream.Medias()[0], &testRTPPacket)
case <-ti.C:
err := stream.WritePacketRTP(stream.Medias()[0], &testRTPPacket)
require.NoError(t, err)
case <-writerTerminate:
return
}
Expand Down Expand Up @@ -1743,9 +1750,11 @@ func TestServerPlayPartialMedias(t *testing.T) {
},
onPlay: func(ctx *ServerHandlerOnPlayCtx) (*base.Response, error) {
go func() {
time.Sleep(1 * time.Second)
stream.WritePacketRTP(stream.Medias()[0], &testRTPPacket)
stream.WritePacketRTP(stream.Medias()[1], &testRTPPacket)
time.Sleep(500 * time.Millisecond)
err := stream.WritePacketRTP(stream.Medias()[0], &testRTPPacket)
require.NoError(t, err)
err = stream.WritePacketRTP(stream.Medias()[1], &testRTPPacket)
require.NoError(t, err)
}()

return &base.Response{
Expand Down Expand Up @@ -1891,7 +1900,7 @@ func TestServerPlayAdditionalInfos(t *testing.T) {
require.NoError(t, err)
defer s.Close()

stream.WritePacketRTP(stream.Medias()[0], &rtp.Packet{
err = stream.WritePacketRTP(stream.Medias()[0], &rtp.Packet{
Header: rtp.Header{
Version: 2,
PayloadType: 96,
Expand All @@ -1901,6 +1910,7 @@ func TestServerPlayAdditionalInfos(t *testing.T) {
},
Payload: []byte{0x01, 0x02, 0x03, 0x04},
})
require.NoError(t, err)

rtpInfo, ssrcs := getInfos()
require.True(t, strings.HasPrefix(mustParseURL((*rtpInfo)[0].URL).Path, "/teststream/trackID="))
Expand All @@ -1920,7 +1930,7 @@ func TestServerPlayAdditionalInfos(t *testing.T) {
nil,
}, ssrcs)

stream.WritePacketRTP(stream.Medias()[1], &rtp.Packet{
err = stream.WritePacketRTP(stream.Medias()[1], &rtp.Packet{
Header: rtp.Header{
Version: 2,
PayloadType: 96,
Expand All @@ -1930,6 +1940,7 @@ func TestServerPlayAdditionalInfos(t *testing.T) {
},
Payload: []byte{0x01, 0x02, 0x03, 0x04},
})
require.NoError(t, err)

rtpInfo, ssrcs = getInfos()
require.True(t, strings.HasPrefix(mustParseURL((*rtpInfo)[0].URL).Path, "/teststream/trackID="))
Expand Down Expand Up @@ -2036,7 +2047,8 @@ func TestServerPlayNoInterleavedIDs(t *testing.T) {
doPlay(t, conn, "rtsp://localhost:8554/teststream", session)

for i := 0; i < 2; i++ {
stream.WritePacketRTP(stream.Medias()[i], &testRTPPacket)
err := stream.WritePacketRTP(stream.Medias()[i], &testRTPPacket)
require.NoError(t, err)

f, err := conn.ReadInterleavedFrame()
require.NoError(t, err)
Expand Down
16 changes: 8 additions & 8 deletions server_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,34 +252,34 @@ func (st *ServerStream) readerSetInactive(ss *ServerSession) {
}

// WritePacketRTP writes a RTP packet to all the readers of the stream.
func (st *ServerStream) WritePacketRTP(medi *media.Media, pkt *rtp.Packet) {
st.WritePacketRTPWithNTP(medi, pkt, time.Now())
func (st *ServerStream) WritePacketRTP(medi *media.Media, pkt *rtp.Packet) error {
return st.WritePacketRTPWithNTP(medi, pkt, time.Now())
}

// WritePacketRTPWithNTP writes a RTP packet to all the readers of the stream.
// ntp is the absolute time of the packet, and is needed to generate RTCP sender reports
// that allows the receiver to reconstruct the absolute time of the packet.
func (st *ServerStream) WritePacketRTPWithNTP(medi *media.Media, pkt *rtp.Packet, ntp time.Time) {
func (st *ServerStream) WritePacketRTPWithNTP(medi *media.Media, pkt *rtp.Packet, ntp time.Time) error {
st.mutex.RLock()
defer st.mutex.RUnlock()

if st.closed {
return
return fmt.Errorf("stream is closed")

Check warning on line 267 in server_stream.go

View check run for this annotation

Codecov / codecov/patch

server_stream.go#L267

Added line #L267 was not covered by tests
}

sm := st.streamMedias[medi]
sm.WritePacketRTPWithNTP(st, pkt, ntp)
return sm.writePacketRTPWithNTP(st, pkt, ntp)
}

// WritePacketRTCP writes a RTCP packet to all the readers of the stream.
func (st *ServerStream) WritePacketRTCP(medi *media.Media, pkt rtcp.Packet) {
func (st *ServerStream) WritePacketRTCP(medi *media.Media, pkt rtcp.Packet) error {
st.mutex.RLock()
defer st.mutex.RUnlock()

if st.closed {
return
return fmt.Errorf("stream is closed")

Check warning on line 280 in server_stream.go

View check run for this annotation

Codecov / codecov/patch

server_stream.go#L280

Added line #L280 was not covered by tests
}

sm := st.streamMedias[medi]
sm.writePacketRTCP(st, pkt)
return sm.writePacketRTCP(st, pkt)
}
14 changes: 9 additions & 5 deletions server_stream_media.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func newServerStreamMedia(st *ServerStream, medi *media.Media, trackID int) *ser
tr.rtcpSender = rtcpsender.New(
forma.ClockRate(),
func(pkt rtcp.Packet) {
st.WritePacketRTCP(cmedia, pkt)
st.WritePacketRTCP(cmedia, pkt) //nolint:errcheck
},
)

Expand Down Expand Up @@ -67,11 +67,11 @@ func (sm *serverStreamMedia) allocateMulticastHandler(s *Server) error {
return nil
}

func (sm *serverStreamMedia) WritePacketRTPWithNTP(ss *ServerStream, pkt *rtp.Packet, ntp time.Time) {
func (sm *serverStreamMedia) writePacketRTPWithNTP(ss *ServerStream, pkt *rtp.Packet, ntp time.Time) error {
byts := make([]byte, udpMaxPayloadSize)
n, err := pkt.MarshalTo(byts)
if err != nil {
return
return err

Check warning on line 74 in server_stream_media.go

View check run for this annotation

Codecov / codecov/patch

server_stream_media.go#L74

Added line #L74 was not covered by tests
}
byts = byts[:n]

Expand All @@ -91,12 +91,14 @@ func (sm *serverStreamMedia) WritePacketRTPWithNTP(ss *ServerStream, pkt *rtp.Pa
if sm.multicastWriter != nil {
sm.multicastWriter.writePacketRTP(byts)
}

return nil
}

func (sm *serverStreamMedia) writePacketRTCP(ss *ServerStream, pkt rtcp.Packet) {
func (sm *serverStreamMedia) writePacketRTCP(ss *ServerStream, pkt rtcp.Packet) error {
byts, err := pkt.Marshal()
if err != nil {
return
return err

Check warning on line 101 in server_stream_media.go

View check run for this annotation

Codecov / codecov/patch

server_stream_media.go#L101

Added line #L101 was not covered by tests
}

// send unicast
Expand All @@ -111,4 +113,6 @@ func (sm *serverStreamMedia) writePacketRTCP(ss *ServerStream, pkt rtcp.Packet)
if sm.multicastWriter != nil {
sm.multicastWriter.writePacketRTCP(byts)
}

return nil
}

0 comments on commit b475c18

Please sign in to comment.