From d1e9dc41dbec2536779a36fa65d065bc34743ba4 Mon Sep 17 00:00:00 2001 From: Pablo Baeyens Date: Thu, 7 Jan 2021 17:44:18 +0100 Subject: [PATCH] [pkg/queue] Add `StartConsumersWithFactory` function (#2714) * [pkg/queue] Add `StartConsumersWithFactory` function This provides a way to keep state for each consumer of a bounded queue, which is useful in certain performance-critical setups. Fixes #2685. Signed-off-by: Pablo Baeyens * Refactor bounded queue tests to extract common parts The common part with the assertions was moved to a new `checkQueue` function that is used by `TestBoundedQueue` and `TestBoundedQueueWithFactory`. Signed-off-by: Pablo Baeyens * Use http.HandlerFunc pattern for getting a Consumer from a callback Signed-off-by: Pablo Baeyens * Address review comment about unit tests Refactor unit tests using a `helper` function that takes a function to start consumers. Signed-off-by: Pablo Baeyens --- pkg/queue/bounded_queue.go | 37 ++++++++++++++++++++++++++------- pkg/queue/bounded_queue_test.go | 29 ++++++++++++++++++++++++-- 2 files changed, 57 insertions(+), 9 deletions(-) diff --git a/pkg/queue/bounded_queue.go b/pkg/queue/bounded_queue.go index db4c5356a4c..0ade4b02c7b 100644 --- a/pkg/queue/bounded_queue.go +++ b/pkg/queue/bounded_queue.go @@ -25,6 +25,11 @@ import ( uatomic "go.uber.org/atomic" ) +// Consumer consumes data from a bounded queue +type Consumer interface { + Consume(item interface{}) +} + // BoundedQueue implements a producer-consumer exchange similar to a ring buffer queue, // where the queue is bounded and if it fills up due to slow consumers, the new items written by // the producer force the earliest items to be dropped. The implementation is actually based on @@ -38,7 +43,7 @@ type BoundedQueue struct { stopped *uatomic.Uint32 items *chan interface{} onDroppedItem func(item interface{}) - consumer func(item interface{}) + factory func() Consumer stopCh chan struct{} } @@ -56,11 +61,11 @@ func NewBoundedQueue(capacity int, onDroppedItem func(item interface{})) *Bounde } } -// StartConsumers starts a given number of goroutines consuming items from the queue -// and passing them into the consumer callback. -func (q *BoundedQueue) StartConsumers(num int, consumer func(item interface{})) { +// StartConsumersWithFactory creates a given number of consumers consuming items +// from the queue in separate goroutines. +func (q *BoundedQueue) StartConsumersWithFactory(num int, factory func() Consumer) { q.workers = num - q.consumer = consumer + q.factory = factory var startWG sync.WaitGroup for i := 0; i < q.workers; i++ { q.stopWG.Add(1) @@ -68,13 +73,14 @@ func (q *BoundedQueue) StartConsumers(num int, consumer func(item interface{})) go func() { startWG.Done() defer q.stopWG.Done() + consumer := q.factory() queue := *q.items for { select { case item, ok := <-queue: if ok { q.size.Sub(1) - q.consumer(item) + consumer.Consume(item) } else { // channel closed, finish worker return @@ -89,6 +95,23 @@ func (q *BoundedQueue) StartConsumers(num int, consumer func(item interface{})) startWG.Wait() } +// ConsumerFunc is an adapter to allow the use of +// a consume function callback as a Consumer. +type ConsumerFunc func(item interface{}) + +// Consume calls c(item) +func (c ConsumerFunc) Consume(item interface{}) { + c(item) +} + +// StartConsumers starts a given number of goroutines consuming items from the queue +// and passing them into the consumer callback. +func (q *BoundedQueue) StartConsumers(num int, callback func(item interface{})) { + q.StartConsumersWithFactory(num, func() Consumer { + return ConsumerFunc(callback) + }) +} + // Produce is used by the producer to submit new item to the queue. Returns false in case of queue overflow. func (q *BoundedQueue) Produce(item interface{}) bool { if q.stopped.Load() != 0 { @@ -171,7 +194,7 @@ func (q *BoundedQueue) Resize(capacity int) bool { swapped := atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&q.items)), unsafe.Pointer(q.items), unsafe.Pointer(&queue)) if swapped { // start a new set of consumers, based on the information given previously - q.StartConsumers(q.workers, q.consumer) + q.StartConsumersWithFactory(q.workers, q.factory) // gracefully drain the existing queue close(previous) diff --git a/pkg/queue/bounded_queue_test.go b/pkg/queue/bounded_queue_test.go index 1aa617a093b..1091f3f370f 100644 --- a/pkg/queue/bounded_queue_test.go +++ b/pkg/queue/bounded_queue_test.go @@ -33,7 +33,7 @@ import ( // In this test we run a queue with capacity 1 and a single consumer. // We want to test the overflow behavior, so we block the consumer // by holding a startLock before submitting items to the queue. -func TestBoundedQueue(t *testing.T) { +func helper(t *testing.T, startConsumers func(q *BoundedQueue, consumerFn func(item interface{}))) { mFact := metricstest.NewFactory(0) counter := mFact.Counter(metrics.Options{Name: "dropped", Tags: nil}) gauge := mFact.Gauge(metrics.Options{Name: "size", Tags: nil}) @@ -48,7 +48,7 @@ func TestBoundedQueue(t *testing.T) { startLock.Lock() // block consumers consumerState := newConsumerState(t) - q.StartConsumers(1, func(item interface{}) { + startConsumers(q, func(item interface{}) { consumerState.record(item.(string)) // block further processing until startLock is released @@ -112,6 +112,18 @@ func TestBoundedQueue(t *testing.T) { assert.False(t, q.Produce("x"), "cannot push to closed queue") } +func TestBoundedQueue(t *testing.T) { + helper(t, func(q *BoundedQueue, consumerFn func(item interface{})) { + q.StartConsumers(1, consumerFn) + }) +} + +func TestBoundedQueueWithFactory(t *testing.T) { + helper(t, func(q *BoundedQueue, consumerFn func(item interface{})) { + q.StartConsumersWithFactory(1, func() Consumer { return ConsumerFunc(consumerFn) }) + }) +} + type consumerState struct { sync.Mutex t *testing.T @@ -332,3 +344,16 @@ func BenchmarkBoundedQueue(b *testing.B) { q.Produce(n) } } + +func BenchmarkBoundedQueueWithFactory(b *testing.B) { + q := NewBoundedQueue(1000, func(item interface{}) { + }) + + q.StartConsumersWithFactory(10, func() Consumer { + return ConsumerFunc(func(item interface{}) {}) + }) + + for n := 0; n < b.N; n++ { + q.Produce(n) + } +}