Skip to content

Commit

Permalink
PubSub: add second layer of retry (close #304)
Browse files Browse the repository at this point in the history
  • Loading branch information
benjben committed Mar 30, 2023
1 parent effa32b commit a6d679a
Show file tree
Hide file tree
Showing 7 changed files with 63 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,8 @@ package model {
maxBytes: Int,
googleProjectId: String,
backoffPolicy: GooglePubSubBackoffPolicyConfig,
startupCheckInterval: FiniteDuration
startupCheckInterval: FiniteDuration,
retryInterval: FiniteDuration
) extends SinkConfig
final case class Kafka(
maxBytes: Int,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,8 @@ abstract class ConfigSpec extends Specification {
maxRpcTimeout = 10000,
rpcTimeoutMultiplier = 2
),
startupCheckInterval = 1.second
startupCheckInterval = 1.second,
retryInterval = 10.seconds
)
case "sqs" =>
Sqs(
Expand Down
7 changes: 6 additions & 1 deletion examples/config.pubsub.extended.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -202,10 +202,15 @@ collector {
# Default: 10 MB
maxBytes = 10000000

# When collector starts, it checks if PubSub topics exist with listTopics.
# Optional. When collector starts, it checks if PubSub topics exist with listTopics.
# This is the interval for the calls.
# /sink-health is made healthy as soon as requests are successful or records are successfully inserted.
startupCheckInterval = 1 second

# Optional. Collector uses built-in retry mechanism of PubSub API.
# In case of failure of these retries, the events are added to a buffer
# and every retryInterval collector retries to send them.
retryInterval = 10 seconds
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ class KinesisSink private (
}

private def checkKinesisHealth(): Unit = {
val healthThread = new Runnable {
val healthRunnable = new Runnable {
override def run() {
while (!kinesisHealthy) {
Try {
Expand All @@ -373,11 +373,11 @@ class KinesisSink private (
}
}
}
executorService.execute(healthThread)
executorService.execute(healthRunnable)
}

private def checkSqsHealth(): Unit = maybeSqs.foreach { sqs =>
val healthThread = new Runnable {
val healthRunnable = new Runnable {
override def run() {
while (!sqsHealthy) {
Try {
Expand All @@ -393,7 +393,7 @@ class KinesisSink private (
}
}
}
executorService.execute(healthThread)
executorService.execute(healthRunnable)
}
}

Expand Down
1 change: 1 addition & 0 deletions pubsub/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ collector {
maxBytes = 10000000

startupCheckInterval = 1 second
retryInterval = 10 seconds
}

buffer {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@ package sinks
import java.util.concurrent.Executors

import scala.collection.JavaConverters._
import scala.collection.mutable.ListBuffer
import scala.util._
import scala.concurrent.duration.{FiniteDuration, MILLISECONDS}
import scala.concurrent.duration._

import org.threeten.bp.Duration

Expand All @@ -42,10 +44,18 @@ import cats.syntax.either._

import com.snowplowanalytics.snowplow.collectors.scalastream.model._

class GooglePubSubSink private (val maxBytes: Int, publisher: Publisher, projectId: String, topicName: String)
extends Sink {
private val logExecutor = Executors.newSingleThreadExecutor()
private val checkExecutor = Executors.newSingleThreadExecutor()
class GooglePubSubSink private (
val maxBytes: Int,
publisher: Publisher,
projectId: String,
topicName: String,
retryInterval: FiniteDuration
) extends Sink {
private val logExecutor = Executors.newSingleThreadExecutor()
// 2 = 1 for health check + 1 for retrying failed inserts
private val scheduledExecutor = Executors.newScheduledThreadPool(2)

private val failedInsertsBuffer = ListBuffer.empty[Array[Byte]]

@volatile private var pubsubHealthy: Boolean = false
override def isHealthy: Boolean = pubsubHealthy
Expand Down Expand Up @@ -75,6 +85,9 @@ class GooglePubSubSink private (val maxBytes: Int, publisher: Publisher, project
)
case t => log.error(s"Publishing message to $topicName failed with error: ${t.getMessage}")
}
failedInsertsBuffer.synchronized {
failedInsertsBuffer.prepend(event)
}
}
},
logExecutor
Expand All @@ -85,8 +98,8 @@ class GooglePubSubSink private (val maxBytes: Int, publisher: Publisher, project

override def shutdown(): Unit = {
publisher.shutdown()
checkExecutor.shutdown()
checkExecutor.awaitTermination(10000, MILLISECONDS)
scheduledExecutor.shutdown()
scheduledExecutor.awaitTermination(10000, MILLISECONDS)
()
}

Expand All @@ -98,11 +111,28 @@ class GooglePubSubSink private (val maxBytes: Int, publisher: Publisher, project
private def eventToPubsubMessage(event: Array[Byte]): PubsubMessage =
PubsubMessage.newBuilder.setData(ByteString.copyFrom(event)).build()

private def retryRunnable: Runnable = new Runnable {
override def run() {
val failedInserts = failedInsertsBuffer.synchronized {
val records = failedInsertsBuffer.toList
failedInsertsBuffer.clear()
records
}
if (failedInserts.nonEmpty) {
log.info(s"Retrying to insert ${failedInserts.size} records into $topicName")
storeRawEvents(failedInserts, "NOT USED")
}
scheduledExecutor.schedule(retryRunnable, retryInterval.toMillis, MILLISECONDS)
()
}
}
scheduledExecutor.schedule(retryRunnable, retryInterval.toMillis, MILLISECONDS)

private def checkPubsubHealth(
customProviders: Option[(TransportChannelProvider, CredentialsProvider)],
startupCheckInterval: FiniteDuration
): Unit = {
val healthThread = new Runnable {
val healthRunnable = new Runnable {
override def run() {
val topicAdmin = GooglePubSubSink.createTopicAdmin(customProviders)

Expand All @@ -127,7 +157,7 @@ class GooglePubSubSink private (val maxBytes: Int, publisher: Publisher, project
}
}
}
checkExecutor.execute(healthThread)
scheduledExecutor.execute(healthRunnable)
}
}

Expand All @@ -149,8 +179,14 @@ object GooglePubSubSink {
(channelProvider, credentialsProvider)
}
publisher <- createPublisher(googlePubSubConfig.googleProjectId, topicName, batching, retry, customProviders)
sink = new GooglePubSubSink(maxBytes, publisher, googlePubSubConfig.googleProjectId, topicName)
_ = sink.checkPubsubHealth(customProviders, googlePubSubConfig.startupCheckInterval)
sink = new GooglePubSubSink(
maxBytes,
publisher,
googlePubSubConfig.googleProjectId,
topicName,
googlePubSubConfig.retryInterval
)
_ = sink.checkPubsubHealth(customProviders, googlePubSubConfig.startupCheckInterval)
} yield sink

private val UserAgent = s"snowplow/stream-collector-${generated.BuildInfo.version}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ class SqsSink private (
}

private def checkSqsHealth(): Unit = {
val healthThread = new Runnable {
val healthRunnable = new Runnable {
override def run() {
while (!sqsHealthy) {
Try {
Expand All @@ -262,7 +262,7 @@ class SqsSink private (
}
}
}
executorService.execute(healthThread)
executorService.execute(healthRunnable)
}
}

Expand Down

0 comments on commit a6d679a

Please sign in to comment.