Skip to content

Commit

Permalink
fix: in the swarm move Connectedness emit after releasing conns (#2373)
Browse files Browse the repository at this point in the history
* fix: in the swarm move Connectedness emit after releasing conns

go-libp2p-kad-dht now listen to both EvtPeerIdentificationCompleted and EvtPeerConnectednessChanged
and EvtPeerIdentificationCompleted calls .ConnsToPeer inorder to do some filtering.

However it happens that it deadlocks because if the swarm is trying to emit a EvtPeerConnectednessChanged
while the subscriber is trying to process an EvtPeerIdentificationCompleted, the subscriber is stuck on
s.conns.RLock() while the swarm wont release it before having sent EvtPeerConnectednessChanged.
Deadlock !

I havn't confirmed this fixes my bug given this takes time to reproduce, I'll startup a new experiment soon.

* Fix other deadlock and add a test

* Make test a little faster

* Bind on localhost

---------

Co-authored-by: Marco Munizaga <git@marcopolo.io>
  • Loading branch information
2 people authored and marten-seemann committed Jun 20, 2023
1 parent 703c3a4 commit 7e046ce
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 21 deletions.
53 changes: 32 additions & 21 deletions p2p/net/swarm/swarm.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,12 +340,7 @@ func (s *Swarm) addConn(tc transport.CapableConn, dir network.Direction) (*Conn,
}

c.streams.m = make(map[*Stream]struct{})
if len(s.conns.m[p]) == 0 { // first connection
s.emitter.Emit(event.EvtPeerConnectednessChanged{
Peer: p,
Connectedness: network.Connected,
})
}
isFirstConnection := len(s.conns.m[p]) == 0
s.conns.m[p] = append(s.conns.m[p], c)

// Add two swarm refs:
Expand All @@ -358,6 +353,15 @@ func (s *Swarm) addConn(tc transport.CapableConn, dir network.Direction) (*Conn,
c.notifyLk.Lock()
s.conns.Unlock()

// Emit event after releasing `s.conns` lock so that a consumer can still
// use swarm methods that need the `s.conns` lock.
if isFirstConnection {
s.emitter.Emit(event.EvtPeerConnectednessChanged{
Peer: p,
Connectedness: network.Connected,
})
}

s.notifyAll(func(f network.Notifiee) {
f.Connected(s, c)
})
Expand Down Expand Up @@ -637,25 +641,32 @@ func (s *Swarm) removeConn(c *Conn) {
p := c.RemotePeer()

s.conns.Lock()
defer s.conns.Unlock()

cs := s.conns.m[p]

if len(cs) == 1 {
delete(s.conns.m, p)
s.conns.Unlock()

// Emit event after releasing `s.conns` lock so that a consumer can still
// use swarm methods that need the `s.conns` lock.
s.emitter.Emit(event.EvtPeerConnectednessChanged{
Peer: p,
Connectedness: network.NotConnected,
})
return
}

defer s.conns.Unlock()

for i, ci := range cs {
if ci == c {
if len(cs) == 1 {
delete(s.conns.m, p)
s.emitter.Emit(event.EvtPeerConnectednessChanged{
Peer: p,
Connectedness: network.NotConnected,
})
} else {
// NOTE: We're intentionally preserving order.
// This way, connections to a peer are always
// sorted oldest to newest.
copy(cs[i:], cs[i+1:])
cs[len(cs)-1] = nil
s.conns.m[p] = cs[:len(cs)-1]
}
// NOTE: We're intentionally preserving order.
// This way, connections to a peer are always
// sorted oldest to newest.
copy(cs[i:], cs[i+1:])
cs[len(cs)-1] = nil
s.conns.m[p] = cs[:len(cs)-1]
break
}
}
Expand Down
49 changes: 49 additions & 0 deletions p2p/net/swarm/swarm_event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,3 +64,52 @@ func TestConnectednessEventsSingleConn(t *testing.T) {
checkEvent(t, sub1, event.EvtPeerConnectednessChanged{Peer: s2.LocalPeer(), Connectedness: network.NotConnected})
checkEvent(t, sub2, event.EvtPeerConnectednessChanged{Peer: s1.LocalPeer(), Connectedness: network.NotConnected})
}

func TestNoDeadlockWhenConsumingConnectednessEvents(t *testing.T) {
dialerEventBus := eventbus.NewBus()
dialer := swarmt.GenSwarm(t, swarmt.OptDialOnly, swarmt.EventBus(dialerEventBus))
defer dialer.Close()

listener := swarmt.GenSwarm(t, swarmt.OptDialOnly)
addrsToListen := []ma.Multiaddr{
ma.StringCast("/ip4/127.0.0.1/udp/0/quic-v1"),
}

if err := listener.Listen(addrsToListen...); err != nil {
t.Fatal(err)
}
listenedAddrs := listener.ListenAddresses()

dialer.Peerstore().AddAddrs(listener.LocalPeer(), listenedAddrs, time.Hour)

sub, err := dialerEventBus.Subscribe(new(event.EvtPeerConnectednessChanged))
require.NoError(t, err)

ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
defer cancel()

// A slow consumer
go func() {
for {
select {
case <-ctx.Done():
return
case <-sub.Out():
time.Sleep(100 * time.Millisecond)
// Do something with the swarm that needs the conns lock
_ = dialer.ConnsToPeer(listener.LocalPeer())
time.Sleep(100 * time.Millisecond)
}
}
}()

for i := 0; i < 10; i++ {
// Connect and disconnect to trigger a bunch of events
_, err := dialer.DialPeer(context.Background(), listener.LocalPeer())
require.NoError(t, err)
dialer.ClosePeer(listener.LocalPeer())
}

// The test should finish without deadlocking
}

0 comments on commit 7e046ce

Please sign in to comment.