Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

Commit

Permalink
address feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
Stebalien committed Jun 2, 2022
1 parent 051eef2 commit 695488b
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 16 deletions.
27 changes: 16 additions & 11 deletions network/connecteventmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,27 +80,32 @@ func (c *connectEventManager) setState(p peer.ID, newState state) {
}
}

// Waits for a change to be enqueued, or for the event manager to be stopped. Returns false if the
// connect event manager has been stopped.
func (c *connectEventManager) waitChange() bool {
for !c.stop && len(c.changeQueue) == 0 {
c.cond.Wait()
}
return !c.stop
}

func (c *connectEventManager) worker() {
c.lk.Lock()
defer c.lk.Unlock()
defer close(c.done)

for {
for !c.stop && len(c.changeQueue) == 0 {
c.cond.Wait()
}

if c.stop {
return
}

for c.waitChange() {
pid := c.changeQueue[0]
c.changeQueue[0] = peer.ID("")
c.changeQueue[0] = peer.ID("") // free the peer ID (slicing won't do that)
c.changeQueue = c.changeQueue[1:]

state, ok := c.peers[pid]
// If we've disconnected and forgotten, continue. We shouldn't reach this?
// If we've disconnected and forgotten, continue.
if !ok {
// This shouldn't be possible because _this_ thread is responsible for
// removing peers from this map, and we shouldn't get duplicate entries in
// the change queue.
log.Error("a change was enqueued for a peer we're not tracking")
continue
}

Expand Down
13 changes: 11 additions & 2 deletions network/connecteventmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ func TestConnectEventManagerConnectDisconnect(t *testing.T) {
connected: true,
})

// Flush the event queue.
wait(t, cem)
require.Equal(t, expectedEvents, connListener.events)

// Block up the event loop.
connListener.Lock()
cem.Connected(peers[1])
Expand Down Expand Up @@ -91,6 +95,7 @@ func TestConnectEventManagerMarkUnresponsive(t *testing.T) {
// Don't mark as connected when we receive a message (could have been delayed).
cem.OnMessage(p)
wait(t, cem)
require.Equal(t, expectedEvents, connListener.events)

// Handle connected event.
cem.Connected(p)
Expand All @@ -100,6 +105,7 @@ func TestConnectEventManagerMarkUnresponsive(t *testing.T) {
peer: p,
connected: true,
})
require.Equal(t, expectedEvents, connListener.events)

// Becomes unresponsive.
cem.MarkUnresponsive(p)
Expand All @@ -109,14 +115,17 @@ func TestConnectEventManagerMarkUnresponsive(t *testing.T) {
peer: p,
connected: false,
})
require.Equal(t, expectedEvents, connListener.events)

// Don't expect the peer to be come connected.
cem.Connected(p)
wait(t, cem)
require.Equal(t, expectedEvents, connListener.events)

// No duplicate event.
cem.MarkUnresponsive(p)
wait(t, cem)
require.Equal(t, expectedEvents, connListener.events)

// Becomes responsive.
cem.OnMessage(p)
Expand All @@ -126,8 +135,6 @@ func TestConnectEventManagerMarkUnresponsive(t *testing.T) {
peer: p,
connected: true,
})

wait(t, cem)
require.Equal(t, expectedEvents, connListener.events)
}

Expand All @@ -148,6 +155,7 @@ func TestConnectEventManagerDisconnectAfterMarkUnresponsive(t *testing.T) {
peer: p,
connected: true,
})
require.Equal(t, expectedEvents, connListener.events)

// Becomes unresponsive.
cem.MarkUnresponsive(p)
Expand All @@ -157,6 +165,7 @@ func TestConnectEventManagerDisconnectAfterMarkUnresponsive(t *testing.T) {
peer: p,
connected: false,
})
require.Equal(t, expectedEvents, connListener.events)

cem.Disconnected(p)
wait(t, cem)
Expand Down
5 changes: 3 additions & 2 deletions network/ipfs_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ func newReceiver() *receiver {
return &receiver{
peers: make(map[peer.ID]struct{}),
messageReceived: make(chan struct{}),
connectionEvent: make(chan bool, 1),
// Avoid blocking. 100 is good enough for tests.
connectionEvent: make(chan bool, 100),
}
}

Expand Down Expand Up @@ -285,7 +286,7 @@ func prepareNetwork(t *testing.T, ctx context.Context, p1 tnet.Identity, r1 *rec
routing2 := mr.ClientWithDatastore(context.TODO(), p2, ds.NewMapDatastore())
bsnet2 := bsnet.NewFromIpfsHost(eh2, routing2)
bsnet2.Start(r2)
t.Cleanup(bsnet1.Stop)
t.Cleanup(bsnet2.Stop)
if r2.listener != nil {
eh2.Network().Notify(r2.listener)
}
Expand Down
2 changes: 1 addition & 1 deletion testnet/network_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func TestSendMessageAsyncButWaitForResponse(t *testing.T) {
t.Fatal("Message not received from the responder")
}
}))
t.Cleanup(responder.Stop)
t.Cleanup(waiter.Stop)

messageSentAsync := bsmsg.New(true)
messageSentAsync.AddBlock(blocks.NewBlock([]byte("data")))
Expand Down

0 comments on commit 695488b

Please sign in to comment.