Skip to content

Commit

Permalink
Address review comment about unit tests
Browse files Browse the repository at this point in the history
Refactor unit tests using a `helper` function that takes a function
to start consumers.

Signed-off-by: Pablo Baeyens <pablo.baeyens@datadoghq.com>
  • Loading branch information
mx-psi committed Jan 7, 2021
1 parent d968bd5 commit c955e1d
Showing 1 changed file with 31 additions and 75 deletions.
106 changes: 31 additions & 75 deletions pkg/queue/bounded_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,33 @@ import (
uatomic "go.uber.org/atomic"
)

func checkQueue(
t *testing.T,
q *BoundedQueue,
startLock *sync.Mutex,
consumerState *consumerState,
mFact *metricstest.Factory,
) {
// In this test we run a queue with capacity 1 and a single consumer.
// We want to test the overflow behavior, so we block the consumer
// by holding a startLock before submitting items to the queue.
func helper(t *testing.T, startConsumers func(q *BoundedQueue, consumerFn func(item interface{}))) {
mFact := metricstest.NewFactory(0)
counter := mFact.Counter(metrics.Options{Name: "dropped", Tags: nil})
gauge := mFact.Gauge(metrics.Options{Name: "size", Tags: nil})

q := NewBoundedQueue(1, func(item interface{}) {
counter.Inc(1)
})
assert.Equal(t, 1, q.Capacity())

var startLock sync.Mutex

startLock.Lock() // block consumers
consumerState := newConsumerState(t)

startConsumers(q, func(item interface{}) {
consumerState.record(item.(string))

// block further processing until startLock is released
startLock.Lock()
//lint:ignore SA2001 empty section is ok
startLock.Unlock()
})

assert.True(t, q.Produce("a"))

// at this point "a" may or may not have been received by the consumer go-routine
Expand Down Expand Up @@ -93,56 +112,16 @@ func checkQueue(
assert.False(t, q.Produce("x"), "cannot push to closed queue")
}

// In this test we run a queue with capacity 1 and a single consumer.
// We want to test the overflow behavior, so we block the consumer
// by holding a startLock before submitting items to the queue.
func TestBoundedQueue(t *testing.T) {
mFact := metricstest.NewFactory(0)
counter := mFact.Counter(metrics.Options{Name: "dropped", Tags: nil})

q := NewBoundedQueue(1, func(item interface{}) {
counter.Inc(1)
})
assert.Equal(t, 1, q.Capacity())

var startLock sync.Mutex

startLock.Lock() // block consumers
consumerState := newConsumerState(t)

q.StartConsumers(1, func(item interface{}) {
consumerState.record(item.(string))

// block further processing until startLock is released
startLock.Lock()
//lint:ignore SA2001 empty section is ok
startLock.Unlock()
helper(t, func(q *BoundedQueue, consumerFn func(item interface{})) {
q.StartConsumers(1, consumerFn)
})

checkQueue(t, q, &startLock, consumerState, mFact)
}

// This test is identical to the previous one but we start the
// queue using a consumer factory instead of a callback.
func TestBoundedQueueWithFactory(t *testing.T) {
mFact := metricstest.NewFactory(0)
counter := mFact.Counter(metrics.Options{Name: "dropped", Tags: nil})

q := NewBoundedQueue(1, func(item interface{}) {
counter.Inc(1)
})
assert.Equal(t, 1, q.Capacity())

var startLock sync.Mutex

startLock.Lock() // block consumers
consumerState := newConsumerState(t)

q.StartConsumersWithFactory(1, func() Consumer {
return newStatefulConsumer(&startLock, consumerState)
helper(t, func(q *BoundedQueue, consumerFn func(item interface{})) {
q.StartConsumersWithFactory(1, func() Consumer { return ConsumerFunc(consumerFn) })
})

checkQueue(t, q, &startLock, consumerState, mFact)
}

type consumerState struct {
Expand Down Expand Up @@ -194,24 +173,6 @@ func (s *consumerState) assertConsumed(expected map[string]bool) {
assert.Equal(s.t, expected, s.snapshot())
}

type statefulConsumer struct {
*sync.Mutex
*consumerState
}

func (s *statefulConsumer) Consume(item interface{}) {
s.record(item.(string))

// block further processing until the lock is released
s.Lock()
//lint:ignore SA2001 empty section is ok
s.Unlock()
}

func newStatefulConsumer(startLock *sync.Mutex, cs *consumerState) Consumer {
return &statefulConsumer{startLock, cs}
}

func TestResizeUp(t *testing.T) {
q := NewBoundedQueue(2, func(item interface{}) {
fmt.Printf("dropped: %v\n", item)
Expand Down Expand Up @@ -384,17 +345,12 @@ func BenchmarkBoundedQueue(b *testing.B) {
}
}

// nopConsumer is a no-op consumer
type nopConsumer struct{}

func (*nopConsumer) Consume(item interface{}) {}

func BenchmarkBoundedQueueWithFactory(b *testing.B) {
q := NewBoundedQueue(1000, func(item interface{}) {
})

q.StartConsumersWithFactory(10, func() Consumer {
return &nopConsumer{}
return ConsumerFunc(func(item interface{}) {})
})

for n := 0; n < b.N; n++ {
Expand Down

0 comments on commit c955e1d

Please sign in to comment.