Skip to content

Commit

Permalink
webrtc: wait for fin_ack for closing datachannel
Browse files Browse the repository at this point in the history
  • Loading branch information
sukunrt committed Dec 5, 2023
1 parent 97b4ca0 commit 0fe1285
Show file tree
Hide file tree
Showing 13 changed files with 625 additions and 232 deletions.
7 changes: 7 additions & 0 deletions core/network/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,13 @@ type MuxedStream interface {
SetWriteDeadline(time.Time) error
}

// AsyncCloser is implemented by streams that need to do expensive operations on close before
// releasing the resources. Closing the stream async avoids blocking the calling goroutine.
type AsyncCloser interface {
// AsyncClose closes the stream and executes onDone after the stream is closed
AsyncClose(onDone func()) error
}

// MuxedConn represents a connection to a remote peer that has been
// extended to support stream multiplexing.
//
Expand Down
6 changes: 6 additions & 0 deletions p2p/net/swarm/swarm_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,12 @@ func (s *Stream) Write(p []byte) (int, error) {
// Close closes the stream, closing both ends and freeing all associated
// resources.
func (s *Stream) Close() error {
if as, ok := s.stream.(network.AsyncCloser); ok {
err := as.AsyncClose(func() {
s.closeAndRemoveStream()
})
return err
}
err := s.stream.Close()
s.closeAndRemoveStream()
return err
Expand Down
45 changes: 45 additions & 0 deletions p2p/net/swarm/swarm_stream_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package swarm

import (
"context"
"sync/atomic"
"testing"

"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/stretchr/testify/require"
)

type asyncStreamWrapper struct {
network.MuxedStream
beforeClose func()
}

func (s *asyncStreamWrapper) AsyncClose(onDone func()) error {
s.beforeClose()
err := s.Close()
onDone()
return err
}

func TestStreamAsyncCloser(t *testing.T) {
s1 := makeSwarm(t)
s2 := makeSwarm(t)

s1.Peerstore().AddAddrs(s2.LocalPeer(), s2.ListenAddresses(), peerstore.TempAddrTTL)
s, err := s1.NewStream(context.Background(), s2.LocalPeer())
require.NoError(t, err)
ss, ok := s.(*Stream)
require.True(t, ok)

var called atomic.Bool
as := &asyncStreamWrapper{
MuxedStream: ss.stream,
beforeClose: func() {
called.Store(true)
},
}
ss.stream = as
ss.Close()
require.True(t, called.Load())
}
83 changes: 83 additions & 0 deletions p2p/test/swarm/swarm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package swarm_test

import (
"context"
"fmt"
"io"
"sync"
"testing"
Expand All @@ -14,6 +15,7 @@ import (
rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager"
"github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/client"
"github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/relay"
libp2pwebrtc "github.com/libp2p/go-libp2p/p2p/transport/webrtc"
ma "github.com/multiformats/go-multiaddr"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -243,3 +245,84 @@ func TestLimitStreamsWhenHangingHandlers(t *testing.T) {
return false
}, 5*time.Second, 100*time.Millisecond)
}

func TestLimitStreamsWhenHangingHandlersWebRTC(t *testing.T) {
var partial rcmgr.PartialLimitConfig
const streamLimit = 10
partial.System.Streams = streamLimit
mgr, err := rcmgr.NewResourceManager(rcmgr.NewFixedLimiter(partial.Build(rcmgr.InfiniteLimits)))
require.NoError(t, err)

maddr, err := ma.NewMultiaddr("/ip4/127.0.0.1/udp/0/webrtc-direct")
require.NoError(t, err)

receiver, err := libp2p.New(
libp2p.ResourceManager(mgr),
libp2p.ListenAddrs(maddr),
libp2p.Transport(libp2pwebrtc.New),
)
require.NoError(t, err)
t.Cleanup(func() { receiver.Close() })

var wg sync.WaitGroup
wg.Add(1)

const pid = "/test"
receiver.SetStreamHandler(pid, func(s network.Stream) {
defer s.Close()
s.Write([]byte{42})
wg.Wait()
})

// Open streamLimit streams
success := 0
// we make a lot of tries because identify and identify push take up a few streams
for i := 0; i < 1000 && success < streamLimit; i++ {
mgr, err = rcmgr.NewResourceManager(rcmgr.NewFixedLimiter(rcmgr.InfiniteLimits))
require.NoError(t, err)

sender, err := libp2p.New(libp2p.ResourceManager(mgr), libp2p.Transport(libp2pwebrtc.New))
require.NoError(t, err)
t.Cleanup(func() { sender.Close() })

sender.Peerstore().AddAddrs(receiver.ID(), receiver.Addrs(), peerstore.PermanentAddrTTL)

s, err := sender.NewStream(context.Background(), receiver.ID(), pid)
if err != nil {
continue
}

var b [1]byte
_, err = io.ReadFull(s, b[:])
if err == nil {
success++
}
sender.Close()
}
require.Equal(t, streamLimit, success)
// We have the maximum number of streams open. Next call should fail.
mgr, err = rcmgr.NewResourceManager(rcmgr.NewFixedLimiter(rcmgr.InfiniteLimits))
require.NoError(t, err)

sender, err := libp2p.New(libp2p.ResourceManager(mgr), libp2p.Transport(libp2pwebrtc.New))
require.NoError(t, err)
t.Cleanup(func() { sender.Close() })

sender.Peerstore().AddAddrs(receiver.ID(), receiver.Addrs(), peerstore.PermanentAddrTTL)

_, err = sender.NewStream(context.Background(), receiver.ID(), pid)
require.Error(t, err)
// Close the open streams
wg.Done()

// Next call should succeed
require.Eventually(t, func() bool {
s, err := sender.NewStream(context.Background(), receiver.ID(), pid)
if err == nil {
s.Close()
return true
}
fmt.Println(err)
return false
}, 5*time.Second, 1*time.Second)
}
12 changes: 7 additions & 5 deletions p2p/test/transport/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,9 +382,6 @@ func TestMoreStreamsThanOurLimits(t *testing.T) {
const streamCount = 1024
for _, tc := range transportsToTest {
t.Run(tc.Name, func(t *testing.T) {
if strings.Contains(tc.Name, "WebRTC") {
t.Skip("This test potentially exhausts the uint16 WebRTC stream ID space.")
}
listenerLimits := rcmgr.PartialLimitConfig{
PeerDefault: rcmgr.ResourceLimits{
Streams: 32,
Expand Down Expand Up @@ -428,7 +425,9 @@ func TestMoreStreamsThanOurLimits(t *testing.T) {
workerCount := 4

var startWorker func(workerIdx int)
var wCount atomic.Int32
startWorker = func(workerIdx int) {
fmt.Println("worker count", wCount.Add(1))
wg.Add(1)
defer wg.Done()
for {
Expand All @@ -440,7 +439,10 @@ func TestMoreStreamsThanOurLimits(t *testing.T) {
// Inline function so we can use defer
func() {
var didErr bool
defer completedStreams.Add(1)
defer func() {
x := completedStreams.Add(1)
fmt.Println("completed streams", x)
}()
defer func() {
// Only the first worker adds more workers
if workerIdx == 0 && !didErr && !sawFirstErr.Load() {
Expand Down Expand Up @@ -483,7 +485,6 @@ func TestMoreStreamsThanOurLimits(t *testing.T) {
return
}
err = func(s network.Stream) error {
defer s.Close()
err = s.SetDeadline(time.Now().Add(100 * time.Millisecond))
if err != nil {
return err
Expand Down Expand Up @@ -511,6 +512,7 @@ func TestMoreStreamsThanOurLimits(t *testing.T) {
return nil
}(s)
if err != nil && shouldRetry(err) {
fmt.Println("failed to write stream!", err)
time.Sleep(50 * time.Millisecond)
continue
}
Expand Down
Loading

0 comments on commit 0fe1285

Please sign in to comment.