From a6d679a484f3d2c36724b8b2f2c73d3a764a4d10 Mon Sep 17 00:00:00 2001 From: Benjamin Benoist Date: Wed, 29 Mar 2023 20:02:38 +0200 Subject: [PATCH] PubSub: add second layer of retry (close #304) --- .../model.scala | 3 +- .../config/ConfigSpec.scala | 3 +- examples/config.pubsub.extended.hocon | 7 ++- .../sinks/KinesisSink.scala | 8 +-- pubsub/src/main/resources/application.conf | 1 + .../sinks/GooglePubSubSink.scala | 56 +++++++++++++++---- .../sinks/SqsSink.scala | 4 +- 7 files changed, 63 insertions(+), 19 deletions(-) diff --git a/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/model.scala b/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/model.scala index 90287d86b..aa4052acd 100644 --- a/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/model.scala +++ b/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/model.scala @@ -138,7 +138,8 @@ package model { maxBytes: Int, googleProjectId: String, backoffPolicy: GooglePubSubBackoffPolicyConfig, - startupCheckInterval: FiniteDuration + startupCheckInterval: FiniteDuration, + retryInterval: FiniteDuration ) extends SinkConfig final case class Kafka( maxBytes: Int, diff --git a/core/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/config/ConfigSpec.scala b/core/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/config/ConfigSpec.scala index bc595f0b8..b04576656 100644 --- a/core/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/config/ConfigSpec.scala +++ b/core/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/config/ConfigSpec.scala @@ -119,7 +119,8 @@ abstract class ConfigSpec extends Specification { maxRpcTimeout = 10000, rpcTimeoutMultiplier = 2 ), - startupCheckInterval = 1.second + startupCheckInterval = 1.second, + retryInterval = 10.seconds ) case "sqs" => Sqs( diff --git a/examples/config.pubsub.extended.hocon b/examples/config.pubsub.extended.hocon index 89858caf5..63c050abe 100644 --- a/examples/config.pubsub.extended.hocon +++ b/examples/config.pubsub.extended.hocon @@ -202,10 +202,15 @@ collector { # Default: 10 MB maxBytes = 10000000 - # When collector starts, it checks if PubSub topics exist with listTopics. + # Optional. When collector starts, it checks if PubSub topics exist with listTopics. # This is the interval for the calls. # /sink-health is made healthy as soon as requests are successful or records are successfully inserted. startupCheckInterval = 1 second + + # Optional. Collector uses built-in retry mechanism of PubSub API. + # In case of failure of these retries, the events are added to a buffer + # and every retryInterval collector retries to send them. + retryInterval = 10 seconds } diff --git a/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisSink.scala b/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisSink.scala index fc6b087b2..8c7f4396b 100644 --- a/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisSink.scala +++ b/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisSink.scala @@ -351,7 +351,7 @@ class KinesisSink private ( } private def checkKinesisHealth(): Unit = { - val healthThread = new Runnable { + val healthRunnable = new Runnable { override def run() { while (!kinesisHealthy) { Try { @@ -373,11 +373,11 @@ class KinesisSink private ( } } } - executorService.execute(healthThread) + executorService.execute(healthRunnable) } private def checkSqsHealth(): Unit = maybeSqs.foreach { sqs => - val healthThread = new Runnable { + val healthRunnable = new Runnable { override def run() { while (!sqsHealthy) { Try { @@ -393,7 +393,7 @@ class KinesisSink private ( } } } - executorService.execute(healthThread) + executorService.execute(healthRunnable) } } diff --git a/pubsub/src/main/resources/application.conf b/pubsub/src/main/resources/application.conf index 1bb139479..b96335869 100644 --- a/pubsub/src/main/resources/application.conf +++ b/pubsub/src/main/resources/application.conf @@ -17,6 +17,7 @@ collector { maxBytes = 10000000 startupCheckInterval = 1 second + retryInterval = 10 seconds } buffer { diff --git a/pubsub/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/GooglePubSubSink.scala b/pubsub/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/GooglePubSubSink.scala index eb0ce4cd8..ad149dd4e 100644 --- a/pubsub/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/GooglePubSubSink.scala +++ b/pubsub/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/GooglePubSubSink.scala @@ -16,8 +16,10 @@ package sinks import java.util.concurrent.Executors import scala.collection.JavaConverters._ +import scala.collection.mutable.ListBuffer import scala.util._ import scala.concurrent.duration.{FiniteDuration, MILLISECONDS} +import scala.concurrent.duration._ import org.threeten.bp.Duration @@ -42,10 +44,18 @@ import cats.syntax.either._ import com.snowplowanalytics.snowplow.collectors.scalastream.model._ -class GooglePubSubSink private (val maxBytes: Int, publisher: Publisher, projectId: String, topicName: String) - extends Sink { - private val logExecutor = Executors.newSingleThreadExecutor() - private val checkExecutor = Executors.newSingleThreadExecutor() +class GooglePubSubSink private ( + val maxBytes: Int, + publisher: Publisher, + projectId: String, + topicName: String, + retryInterval: FiniteDuration +) extends Sink { + private val logExecutor = Executors.newSingleThreadExecutor() + // 2 = 1 for health check + 1 for retrying failed inserts + private val scheduledExecutor = Executors.newScheduledThreadPool(2) + + private val failedInsertsBuffer = ListBuffer.empty[Array[Byte]] @volatile private var pubsubHealthy: Boolean = false override def isHealthy: Boolean = pubsubHealthy @@ -75,6 +85,9 @@ class GooglePubSubSink private (val maxBytes: Int, publisher: Publisher, project ) case t => log.error(s"Publishing message to $topicName failed with error: ${t.getMessage}") } + failedInsertsBuffer.synchronized { + failedInsertsBuffer.prepend(event) + } } }, logExecutor @@ -85,8 +98,8 @@ class GooglePubSubSink private (val maxBytes: Int, publisher: Publisher, project override def shutdown(): Unit = { publisher.shutdown() - checkExecutor.shutdown() - checkExecutor.awaitTermination(10000, MILLISECONDS) + scheduledExecutor.shutdown() + scheduledExecutor.awaitTermination(10000, MILLISECONDS) () } @@ -98,11 +111,28 @@ class GooglePubSubSink private (val maxBytes: Int, publisher: Publisher, project private def eventToPubsubMessage(event: Array[Byte]): PubsubMessage = PubsubMessage.newBuilder.setData(ByteString.copyFrom(event)).build() + private def retryRunnable: Runnable = new Runnable { + override def run() { + val failedInserts = failedInsertsBuffer.synchronized { + val records = failedInsertsBuffer.toList + failedInsertsBuffer.clear() + records + } + if (failedInserts.nonEmpty) { + log.info(s"Retrying to insert ${failedInserts.size} records into $topicName") + storeRawEvents(failedInserts, "NOT USED") + } + scheduledExecutor.schedule(retryRunnable, retryInterval.toMillis, MILLISECONDS) + () + } + } + scheduledExecutor.schedule(retryRunnable, retryInterval.toMillis, MILLISECONDS) + private def checkPubsubHealth( customProviders: Option[(TransportChannelProvider, CredentialsProvider)], startupCheckInterval: FiniteDuration ): Unit = { - val healthThread = new Runnable { + val healthRunnable = new Runnable { override def run() { val topicAdmin = GooglePubSubSink.createTopicAdmin(customProviders) @@ -127,7 +157,7 @@ class GooglePubSubSink private (val maxBytes: Int, publisher: Publisher, project } } } - checkExecutor.execute(healthThread) + scheduledExecutor.execute(healthRunnable) } } @@ -149,8 +179,14 @@ object GooglePubSubSink { (channelProvider, credentialsProvider) } publisher <- createPublisher(googlePubSubConfig.googleProjectId, topicName, batching, retry, customProviders) - sink = new GooglePubSubSink(maxBytes, publisher, googlePubSubConfig.googleProjectId, topicName) - _ = sink.checkPubsubHealth(customProviders, googlePubSubConfig.startupCheckInterval) + sink = new GooglePubSubSink( + maxBytes, + publisher, + googlePubSubConfig.googleProjectId, + topicName, + googlePubSubConfig.retryInterval + ) + _ = sink.checkPubsubHealth(customProviders, googlePubSubConfig.startupCheckInterval) } yield sink private val UserAgent = s"snowplow/stream-collector-${generated.BuildInfo.version}" diff --git a/sqs/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/SqsSink.scala b/sqs/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/SqsSink.scala index 6b6358667..b3e388ad8 100644 --- a/sqs/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/SqsSink.scala +++ b/sqs/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/SqsSink.scala @@ -246,7 +246,7 @@ class SqsSink private ( } private def checkSqsHealth(): Unit = { - val healthThread = new Runnable { + val healthRunnable = new Runnable { override def run() { while (!sqsHealthy) { Try { @@ -262,7 +262,7 @@ class SqsSink private ( } } } - executorService.execute(healthThread) + executorService.execute(healthRunnable) } }