Skip to content

Commit

Permalink
[pkg/queue] Add StartConsumersWithFactory function (jaegertracing#2714
Browse files Browse the repository at this point in the history
)

* [pkg/queue] Add `StartConsumersWithFactory` function

This provides a way to keep state for each consumer of a bounded queue,
which is useful in certain performance-critical setups. Fixes jaegertracing#2685.

Signed-off-by: Pablo Baeyens <pablo.baeyens@datadoghq.com>

* Refactor bounded queue tests to extract common parts
The common part with the assertions was moved to a new `checkQueue`
function that is used by `TestBoundedQueue` and `TestBoundedQueueWithFactory`.

Signed-off-by: Pablo Baeyens <pablo.baeyens@datadoghq.com>

* Use http.HandlerFunc pattern for getting a Consumer from a callback

Signed-off-by: Pablo Baeyens <pablo.baeyens@datadoghq.com>

* Address review comment about unit tests

Refactor unit tests using a `helper` function that takes a function
to start consumers.

Signed-off-by: Pablo Baeyens <pablo.baeyens@datadoghq.com>
  • Loading branch information
mx-psi authored and bhiravabhatla committed Jan 25, 2021
1 parent 183af3c commit d1e9dc4
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 9 deletions.
37 changes: 30 additions & 7 deletions pkg/queue/bounded_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ import (
uatomic "go.uber.org/atomic"
)

// Consumer consumes data from a bounded queue
type Consumer interface {
Consume(item interface{})
}

// BoundedQueue implements a producer-consumer exchange similar to a ring buffer queue,
// where the queue is bounded and if it fills up due to slow consumers, the new items written by
// the producer force the earliest items to be dropped. The implementation is actually based on
Expand All @@ -38,7 +43,7 @@ type BoundedQueue struct {
stopped *uatomic.Uint32
items *chan interface{}
onDroppedItem func(item interface{})
consumer func(item interface{})
factory func() Consumer
stopCh chan struct{}
}

Expand All @@ -56,25 +61,26 @@ func NewBoundedQueue(capacity int, onDroppedItem func(item interface{})) *Bounde
}
}

// StartConsumers starts a given number of goroutines consuming items from the queue
// and passing them into the consumer callback.
func (q *BoundedQueue) StartConsumers(num int, consumer func(item interface{})) {
// StartConsumersWithFactory creates a given number of consumers consuming items
// from the queue in separate goroutines.
func (q *BoundedQueue) StartConsumersWithFactory(num int, factory func() Consumer) {
q.workers = num
q.consumer = consumer
q.factory = factory
var startWG sync.WaitGroup
for i := 0; i < q.workers; i++ {
q.stopWG.Add(1)
startWG.Add(1)
go func() {
startWG.Done()
defer q.stopWG.Done()
consumer := q.factory()
queue := *q.items
for {
select {
case item, ok := <-queue:
if ok {
q.size.Sub(1)
q.consumer(item)
consumer.Consume(item)
} else {
// channel closed, finish worker
return
Expand All @@ -89,6 +95,23 @@ func (q *BoundedQueue) StartConsumers(num int, consumer func(item interface{}))
startWG.Wait()
}

// ConsumerFunc is an adapter to allow the use of
// a consume function callback as a Consumer.
type ConsumerFunc func(item interface{})

// Consume calls c(item)
func (c ConsumerFunc) Consume(item interface{}) {
c(item)
}

// StartConsumers starts a given number of goroutines consuming items from the queue
// and passing them into the consumer callback.
func (q *BoundedQueue) StartConsumers(num int, callback func(item interface{})) {
q.StartConsumersWithFactory(num, func() Consumer {
return ConsumerFunc(callback)
})
}

// Produce is used by the producer to submit new item to the queue. Returns false in case of queue overflow.
func (q *BoundedQueue) Produce(item interface{}) bool {
if q.stopped.Load() != 0 {
Expand Down Expand Up @@ -171,7 +194,7 @@ func (q *BoundedQueue) Resize(capacity int) bool {
swapped := atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&q.items)), unsafe.Pointer(q.items), unsafe.Pointer(&queue))
if swapped {
// start a new set of consumers, based on the information given previously
q.StartConsumers(q.workers, q.consumer)
q.StartConsumersWithFactory(q.workers, q.factory)

// gracefully drain the existing queue
close(previous)
Expand Down
29 changes: 27 additions & 2 deletions pkg/queue/bounded_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
// In this test we run a queue with capacity 1 and a single consumer.
// We want to test the overflow behavior, so we block the consumer
// by holding a startLock before submitting items to the queue.
func TestBoundedQueue(t *testing.T) {
func helper(t *testing.T, startConsumers func(q *BoundedQueue, consumerFn func(item interface{}))) {
mFact := metricstest.NewFactory(0)
counter := mFact.Counter(metrics.Options{Name: "dropped", Tags: nil})
gauge := mFact.Gauge(metrics.Options{Name: "size", Tags: nil})
Expand All @@ -48,7 +48,7 @@ func TestBoundedQueue(t *testing.T) {
startLock.Lock() // block consumers
consumerState := newConsumerState(t)

q.StartConsumers(1, func(item interface{}) {
startConsumers(q, func(item interface{}) {
consumerState.record(item.(string))

// block further processing until startLock is released
Expand Down Expand Up @@ -112,6 +112,18 @@ func TestBoundedQueue(t *testing.T) {
assert.False(t, q.Produce("x"), "cannot push to closed queue")
}

func TestBoundedQueue(t *testing.T) {
helper(t, func(q *BoundedQueue, consumerFn func(item interface{})) {
q.StartConsumers(1, consumerFn)
})
}

func TestBoundedQueueWithFactory(t *testing.T) {
helper(t, func(q *BoundedQueue, consumerFn func(item interface{})) {
q.StartConsumersWithFactory(1, func() Consumer { return ConsumerFunc(consumerFn) })
})
}

type consumerState struct {
sync.Mutex
t *testing.T
Expand Down Expand Up @@ -332,3 +344,16 @@ func BenchmarkBoundedQueue(b *testing.B) {
q.Produce(n)
}
}

func BenchmarkBoundedQueueWithFactory(b *testing.B) {
q := NewBoundedQueue(1000, func(item interface{}) {
})

q.StartConsumersWithFactory(10, func() Consumer {
return ConsumerFunc(func(item interface{}) {})
})

for n := 0; n < b.N; n++ {
q.Produce(n)
}
}

0 comments on commit d1e9dc4

Please sign in to comment.