Skip to content

Commit

Permalink
Merge pull request ipfs#241 from libp2p/dq-var-test-races
Browse files Browse the repository at this point in the history
Fix races with DialQueue variables
  • Loading branch information
raulk committed Jan 31, 2019
2 parents b598d08 + 46e8562 commit 1b1fb7e
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 32 deletions.
28 changes: 17 additions & 11 deletions dial_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
queue "github.com/libp2p/go-libp2p-peerstore/queue"
)

var (
const (
// DialQueueMinParallelism is the minimum number of worker dial goroutines that will be alive at any time.
DialQueueMinParallelism = 6
// DialQueueMaxParallelism is the maximum number of worker dial goroutines that can be alive at any time.
Expand All @@ -25,8 +25,10 @@ type dialQueue struct {
ctx context.Context
dialFn func(context.Context, peer.ID) error

nWorkers int
scalingFactor float64
nWorkers int
scalingFactor float64
scalingMutePeriod time.Duration
maxIdle time.Duration

in *queue.ChanQueue
out *queue.ChanQueue
Expand Down Expand Up @@ -61,12 +63,16 @@ type waitingCh struct {
// Dialler throttling (e.g. FD limit exceeded) is a concern, as we can easily spin up more workers to compensate, and
// end up adding fuel to the fire. Since we have no deterministic way to detect this for now, we hard-limit concurrency
// to DialQueueMaxParallelism.
func newDialQueue(ctx context.Context, target string, in *queue.ChanQueue, dialFn func(context.Context, peer.ID) error) *dialQueue {
func newDialQueue(ctx context.Context, target string, in *queue.ChanQueue, dialFn func(context.Context, peer.ID) error,
maxIdle, scalingMutePeriod time.Duration,
) *dialQueue {
sq := &dialQueue{
ctx: ctx,
dialFn: dialFn,
nWorkers: DialQueueMinParallelism,
scalingFactor: 1.5,
ctx: ctx,
dialFn: dialFn,
nWorkers: DialQueueMinParallelism,
scalingFactor: 1.5,
scalingMutePeriod: scalingMutePeriod,
maxIdle: maxIdle,

in: in,
out: queue.NewChanQueue(ctx, queue.NewXORDistancePQ(target)),
Expand Down Expand Up @@ -145,13 +151,13 @@ func (dq *dialQueue) control() {
dialled = nil
}
case <-dq.growCh:
if time.Since(lastScalingEvt) < DialQueueScalingMutePeriod {
if time.Since(lastScalingEvt) < dq.scalingMutePeriod {
continue
}
dq.grow()
lastScalingEvt = time.Now()
case <-dq.shrinkCh:
if time.Since(lastScalingEvt) < DialQueueScalingMutePeriod {
if time.Since(lastScalingEvt) < dq.scalingMutePeriod {
continue
}
dq.shrink()
Expand Down Expand Up @@ -259,7 +265,7 @@ func (dq *dialQueue) worker() {
case <-idleTimer.C:
default:
}
idleTimer.Reset(DialQueueMaxIdle)
idleTimer.Reset(dq.maxIdle)

select {
case <-dq.dieCh:
Expand Down
24 changes: 4 additions & 20 deletions dial_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package dht

import (
"context"
"fmt"
"sync"
"sync/atomic"
"testing"
Expand All @@ -12,12 +11,7 @@ import (
queue "github.com/libp2p/go-libp2p-peerstore/queue"
)

func init() {
DialQueueScalingMutePeriod = 0
}

func TestDialQueueGrowsOnSlowDials(t *testing.T) {
DialQueueMaxIdle = 10 * time.Minute

in := queue.NewChanQueue(context.Background(), queue.NewXORDistancePQ("test"))
hang := make(chan struct{})
Expand All @@ -35,7 +29,7 @@ func TestDialQueueGrowsOnSlowDials(t *testing.T) {
}

// remove the mute period to grow faster.
dq := newDialQueue(context.Background(), "test", in, dialFn)
dq := newDialQueue(context.Background(), "test", in, dialFn, 10*time.Minute, 0)

for i := 0; i < 4; i++ {
_ = dq.Consume()
Expand All @@ -55,7 +49,6 @@ func TestDialQueueGrowsOnSlowDials(t *testing.T) {

func TestDialQueueShrinksWithNoConsumers(t *testing.T) {
// reduce interference from the other shrink path.
DialQueueMaxIdle = 10 * time.Minute

in := queue.NewChanQueue(context.Background(), queue.NewXORDistancePQ("test"))
hang := make(chan struct{})
Expand All @@ -68,12 +61,7 @@ func TestDialQueueShrinksWithNoConsumers(t *testing.T) {
return nil
}

dq := newDialQueue(context.Background(), "test", in, dialFn)

defer func() {
recover()
fmt.Println(dq.nWorkers)
}()
dq := newDialQueue(context.Background(), "test", in, dialFn, 10*time.Minute, 0)

// acquire 3 consumers, everytime we acquire a consumer, we will grow the pool because no dial job is completed
// and immediately returnable.
Expand Down Expand Up @@ -117,8 +105,6 @@ func TestDialQueueShrinksWithNoConsumers(t *testing.T) {

// Inactivity = workers are idle because the DHT query is progressing slow and is producing too few peers to dial.
func TestDialQueueShrinksWithWhenIdle(t *testing.T) {
DialQueueMaxIdle = 1 * time.Second

in := queue.NewChanQueue(context.Background(), queue.NewXORDistancePQ("test"))
hang := make(chan struct{})

Expand All @@ -135,7 +121,7 @@ func TestDialQueueShrinksWithWhenIdle(t *testing.T) {
in.EnqChan <- peer.ID(i)
}

dq := newDialQueue(context.Background(), "test", in, dialFn)
dq := newDialQueue(context.Background(), "test", in, dialFn, time.Second, 0)

// keep up to speed with backlog by releasing the dial function every time we acquire a channel.
for i := 0; i < 13; i++ {
Expand All @@ -161,8 +147,6 @@ func TestDialQueueShrinksWithWhenIdle(t *testing.T) {
}

func TestDialQueueMutePeriodHonored(t *testing.T) {
DialQueueScalingMutePeriod = 2 * time.Second

in := queue.NewChanQueue(context.Background(), queue.NewXORDistancePQ("test"))
hang := make(chan struct{})
var wg sync.WaitGroup
Expand All @@ -178,7 +162,7 @@ func TestDialQueueMutePeriodHonored(t *testing.T) {
in.EnqChan <- peer.ID(i)
}

dq := newDialQueue(context.Background(), "test", in, dialFn)
dq := newDialQueue(context.Background(), "test", in, dialFn, DialQueueMaxIdle, 2*time.Second)

// pick up three consumers.
for i := 0; i < 3; i++ {
Expand Down
2 changes: 1 addition & 1 deletion query.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func newQueryRunner(q *dhtQuery) *dhtQueryRunner {
peersToQuery: peersToQuery,
proc: proc,
}
r.peersDialed = newDialQueue(ctx, q.key, peersToQuery, r.dialPeer)
r.peersDialed = newDialQueue(ctx, q.key, peersToQuery, r.dialPeer, DialQueueMaxIdle, DialQueueScalingMutePeriod)
return r
}

Expand Down

0 comments on commit 1b1fb7e

Please sign in to comment.