From 6a6680cef6399c623a330fa90fe0d829db002a73 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Tue, 21 Apr 2020 04:02:58 -0700 Subject: [PATCH] Removing sigUnWait from controller --- libbeat/publisher/pipeline/controller.go | 1 - libbeat/publisher/pipeline/retry.go | 47 ++++++++++++++++-------- 2 files changed, 31 insertions(+), 17 deletions(-) diff --git a/libbeat/publisher/pipeline/controller.go b/libbeat/publisher/pipeline/controller.go index ed6d7ab5f29..837a70eab77 100644 --- a/libbeat/publisher/pipeline/controller.go +++ b/libbeat/publisher/pipeline/controller.go @@ -137,7 +137,6 @@ func (c *outputController) Set(outGrp outputs.Group) { // restart consumer (potentially blocked by retryer) c.consumer.sigContinue() - c.consumer.sigUnWait() c.observer.updateOutputGroup() } diff --git a/libbeat/publisher/pipeline/retry.go b/libbeat/publisher/pipeline/retry.go index 5521f3444af..7c1b955b48e 100644 --- a/libbeat/publisher/pipeline/retry.go +++ b/libbeat/publisher/pipeline/retry.go @@ -168,14 +168,7 @@ func (r *retryer) loop() { active = buffer[0] activeSize = len(active.Events()) if !consumerBlocked { - consumerBlocked = blockConsumer(numOutputs, len(buffer)) - if consumerBlocked { - log.Info("retryer: send wait signal to consumer") - if r.consumer != nil { - r.consumer.sigWait() - } - log.Info(" done") - } + consumerBlocked = r.checkConsumerBlock(numOutputs, len(buffer)) } } @@ -193,27 +186,49 @@ func (r *retryer) loop() { } if consumerBlocked { - consumerBlocked = blockConsumer(numOutputs, len(buffer)) - if !consumerBlocked { - log.Info("retryer: send unwait-signal to consumer") - if r.consumer != nil { - r.consumer.sigUnWait() - } - log.Info(" done") - } + consumerBlocked = r.checkConsumerBlock(numOutputs, len(buffer)) } case sig := <-r.sig: switch sig.tag { case sigRetryerOutputAdded: numOutputs++ + if consumerBlocked { + consumerBlocked = r.checkConsumerBlock(numOutputs, len(buffer)) + } case sigRetryerOutputRemoved: numOutputs-- + if !consumerBlocked { + consumerBlocked = r.checkConsumerBlock(numOutputs, len(buffer)) + } } } } } +func (r *retryer) checkConsumerBlock(numOutputs, numBatches int) bool { + consumerBlocked := blockConsumer(numOutputs, numBatches) + if r.consumer == nil { + return consumerBlocked + } + + if consumerBlocked { + r.logger.Info("retryer: send wait signal to consumer") + if r.consumer != nil { + r.consumer.sigWait() + } + r.logger.Info(" done") + } else { + r.logger.Info("retryer: send unwait signal to consumer") + if r.consumer != nil { + r.consumer.sigUnWait() + } + r.logger.Info(" done") + } + + return consumerBlocked +} + func blockConsumer(numOutputs, numBatches int) bool { return numBatches/3 >= numOutputs }