Skip to content

Commit

Permalink
fix race in connection timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
sukunrt committed Feb 14, 2024
1 parent bd78a36 commit 43c7f77
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 12 deletions.
10 changes: 9 additions & 1 deletion p2p/transport/webrtc/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,12 @@ func (c *connection) closeWithError(err error) {
c.cancel()
// closing peerconnection will close the datachannels associated with the streams
c.pc.Close()
for _, s := range c.streams {

c.m.Lock()
streams := c.streams
c.streams = nil
c.m.Unlock()
for _, s := range streams {
s.closeForShutdown(err)
}
c.scope.Done()
Expand Down Expand Up @@ -207,6 +212,9 @@ func (c *connection) Transport() tpt.Transport { return c.transport }
func (c *connection) addStream(str *stream) error {
c.m.Lock()
defer c.m.Unlock()
if c.streams == nil {
return c.closeErr
}
if _, ok := c.streams[str.id]; ok {
return errors.New("stream ID already exists")
}
Expand Down
10 changes: 7 additions & 3 deletions p2p/transport/webrtc/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,8 @@ func newStream(
func (s *stream) Close() error {
s.mx.Lock()
if s.closeForShutdownErr != nil {
return s.closeForShutdownErr
s.mx.Unlock()
return nil
}
s.mx.Unlock()

Expand All @@ -166,6 +167,7 @@ func (s *stream) Close() error {
func (s *stream) Reset() error {
s.mx.Lock()
if s.closeForShutdownErr != nil {
s.mx.Unlock()
return nil
}
s.mx.Unlock()
Expand All @@ -180,6 +182,8 @@ func (s *stream) Reset() error {
}

func (s *stream) closeForShutdown(closeErr error) {
defer s.cleanup()

s.mx.Lock()
defer s.mx.Unlock()

Expand All @@ -189,7 +193,6 @@ func (s *stream) closeForShutdown(closeErr error) {
case s.sendStateChanged <- struct{}{}:
default:
}
s.cleanup()
}

func (s *stream) SetDeadline(t time.Time) error {
Expand Down Expand Up @@ -275,7 +278,8 @@ func (s *stream) spawnControlMessageReader() {
s.processIncomingFlag(s.nextMessage.Flag)
s.nextMessage = nil
}
for s.sendState != sendStateDataReceived && s.sendState != sendStateReset {
for s.closeForShutdownErr == nil &&
s.sendState != sendStateDataReceived && s.sendState != sendStateReset {
var msg pb.Message
if !setDeadline() {
return
Expand Down
12 changes: 4 additions & 8 deletions p2p/transport/webrtc/stream_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@ func (s *stream) Read(b []byte) (int, error) {
var msg pb.Message
if err := s.reader.ReadMsg(&msg); err != nil {
s.mx.Lock()
// connection was closed
if s.closeForShutdownErr != nil {
return 0, s.closeForShutdownErr
}
if err == io.EOF {
// connection was closed
if s.closeForShutdownErr != nil {
return 0, s.closeForShutdownErr
}
// if the channel was properly closed, return EOF
if s.receiveState == receiveStateDataRead {
return 0, io.EOF
Expand All @@ -59,10 +59,6 @@ func (s *stream) Read(b []byte) (int, error) {
if s.receiveState == receiveStateDataRead {
return 0, io.EOF
}
// connection was closed
if s.closeForShutdownErr != nil {
return 0, s.closeForShutdownErr
}
return 0, err
}
s.mx.Lock()
Expand Down

0 comments on commit 43c7f77

Please sign in to comment.