From acfc82a4254729712b234ef849d1cd38c51d27b4 Mon Sep 17 00:00:00 2001 From: Benjamin Benoist Date: Tue, 28 Mar 2023 09:58:26 +0200 Subject: [PATCH] Make MaxRetries configurable (close #295) --- .../model.scala | 4 ++-- .../TestUtils.scala | 2 +- .../config/ConfigSpec.scala | 10 ++++++---- examples/config.kinesis.extended.hocon | 12 +++++++++--- examples/config.sqs.extended.hocon | 11 ++++++++--- kinesis/src/main/resources/application.conf | 5 +++-- .../sinks/KinesisSink.scala | 14 +++++++------- sqs/src/main/resources/application.conf | 5 +++-- .../sinks/SqsSink.scala | 6 +++--- 9 files changed, 42 insertions(+), 27 deletions(-) diff --git a/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/model.scala b/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/model.scala index 4a908a8b5..879fc363c 100644 --- a/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/model.scala +++ b/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/model.scala @@ -92,8 +92,8 @@ package model { final case class P3PConfig(policyRef: String, CP: String) final case class CrossDomainConfig(enabled: Boolean, domains: List[String], secure: Boolean) final case class CORSConfig(accessControlMaxAge: FiniteDuration) - final case class KinesisBackoffPolicyConfig(minBackoff: Long, maxBackoff: Long) - final case class SqsBackoffPolicyConfig(minBackoff: Long, maxBackoff: Long) + final case class KinesisBackoffPolicyConfig(minBackoff: Long, maxBackoff: Long, maxRetries: Int) + final case class SqsBackoffPolicyConfig(minBackoff: Long, maxBackoff: Long, maxRetries: Int) final case class GooglePubSubBackoffPolicyConfig( minBackoff: Long, maxBackoff: Long, diff --git a/core/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/TestUtils.scala b/core/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/TestUtils.scala index da93295da..5d7dbe841 100644 --- a/core/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/TestUtils.scala +++ b/core/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/TestUtils.scala @@ -52,7 +52,7 @@ object TestUtils { region = "us-east-1", threadPoolSize = 12, aws = AWSConfig("cpf", "cpf"), - backoffPolicy = KinesisBackoffPolicyConfig(3000L, 60000L), + backoffPolicy = KinesisBackoffPolicyConfig(500L, 1500L, 3), customEndpoint = None, sqsGoodBuffer = Some("good-buffer"), sqsBadBuffer = Some("bad-buffer"), diff --git a/core/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/config/ConfigSpec.scala b/core/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/config/ConfigSpec.scala index ce24e4098..6fdb542d1 100644 --- a/core/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/config/ConfigSpec.scala +++ b/core/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/config/ConfigSpec.scala @@ -130,8 +130,9 @@ abstract class ConfigSpec extends Specification { secretKey = "iam" ), backoffPolicy = SqsBackoffPolicyConfig( - minBackoff = 3000, - maxBackoff = 600000 + minBackoff = 500, + maxBackoff = 1500, + maxRetries = 3 ) ) case "stdout" => Stdout(maxBytes = 1000000000) @@ -145,8 +146,9 @@ abstract class ConfigSpec extends Specification { secretKey = "iam" ), backoffPolicy = KinesisBackoffPolicyConfig( - minBackoff = 3000, - maxBackoff = 600000 + minBackoff = 500, + maxBackoff = 1500, + maxRetries = 3 ), sqsBadBuffer = None, sqsGoodBuffer = None, diff --git a/examples/config.kinesis.extended.hocon b/examples/config.kinesis.extended.hocon index ba2ef0f1f..5a802f89d 100644 --- a/examples/config.kinesis.extended.hocon +++ b/examples/config.kinesis.extended.hocon @@ -219,10 +219,16 @@ collector { secretKey = iam } - # Minimum and maximum backoff periods, in milliseconds + # Optional backoffPolicy { - minBackoff = 3000 - maxBackoff = 600000 + # Minimum backoff period in milliseconds + minBackoff = 500 + # Maximum backoff period in milliseconds + maxBackoff = 1500 + # Failed inserts are retried forever. + # In case of just Kinesis without SQS, number of retries before setting /sink-health unhealthy. + # In case of Kinesis + SQS, number of retries with one before retrying with the other. + maxRetries = 3 } # Optional. Maximum number of bytes that a single record can contain. diff --git a/examples/config.sqs.extended.hocon b/examples/config.sqs.extended.hocon index 7c10c130d..233561541 100644 --- a/examples/config.sqs.extended.hocon +++ b/examples/config.sqs.extended.hocon @@ -202,10 +202,15 @@ collector { secretKey = ${?COLLECTOR_STREAMS_SINK_AWS_SECRET_KEY} } - # Minimum and maximum backoff periods, in milliseconds + # Optional backoffPolicy { - minBackoff = 3000 - maxBackoff = 600000 + # Minimum backoff period in milliseconds + minBackoff = 500 + # Maximum backoff period in milliseconds + maxBackoff = 1500 + # Failed inserts are retried forever. + # Number of retries before setting /sink-health unhealthy. + maxRetries = 3 } # Optional. Maximum number of bytes that a single record can contain. diff --git a/kinesis/src/main/resources/application.conf b/kinesis/src/main/resources/application.conf index db505bf55..fb0fa8189 100644 --- a/kinesis/src/main/resources/application.conf +++ b/kinesis/src/main/resources/application.conf @@ -10,8 +10,9 @@ collector { } backoffPolicy { - minBackoff = 3000 - maxBackoff = 600000 + minBackoff = 500 + maxBackoff = 1500 + maxRetries = 3 } maxBytes = 1000000 diff --git a/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisSink.scala b/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisSink.scala index 50bdaba8c..754ccdc2c 100644 --- a/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisSink.scala +++ b/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisSink.scala @@ -61,10 +61,10 @@ class KinesisSink private ( private val maxBackoff = kinesisConfig.backoffPolicy.maxBackoff private val minBackoff = kinesisConfig.backoffPolicy.minBackoff + private val maxRetries = kinesisConfig.backoffPolicy.maxRetries private val randomGenerator = new java.util.Random() private val MaxSqsBatchSizeN = 10 - private val MaxRetries = 3 // TODO: make this configurable implicit lazy val ec: ExecutionContextExecutorService = concurrent.ExecutionContext.fromExecutorService(executorService) @@ -138,13 +138,13 @@ class KinesisSink private ( if (batch.nonEmpty) maybeSqs match { // Kinesis healthy case _ if kinesisHealhy => - writeBatchToKinesisWithRetries(batch, minBackoff, MaxRetries) + writeBatchToKinesisWithRetries(batch, minBackoff, maxRetries) // No SQS buffer case None => - writeBatchToKinesisWithRetries(batch, minBackoff, MaxRetries) + writeBatchToKinesisWithRetries(batch, minBackoff, maxRetries) // Kinesis not healthy and SQS buffer defined case Some(sqs) => - writeBatchToSqsWithRetries(batch, sqs, minBackoff, MaxRetries) + writeBatchToSqsWithRetries(batch, sqs, minBackoff, maxRetries) } def writeBatchToKinesisWithRetries(batch: List[Events], nextBackoff: Long, retriesLeft: Int): Unit = { @@ -220,10 +220,10 @@ class KinesisSink private ( log.error( s"SQS buffer ${sqs.sqsBufferName} defined for stream $streamName. Retrying to send the events to SQS" ) - writeBatchToSqsWithRetries(failedRecords, sqs, minBackoff, MaxRetries) + writeBatchToSqsWithRetries(failedRecords, sqs, minBackoff, maxRetries) case None => log.error(s"No SQS buffer defined for stream $streamName. Retrying to send the events to Kinesis") - writeBatchToKinesisWithRetries(failedRecords, maxBackoff, MaxRetries) + writeBatchToKinesisWithRetries(failedRecords, maxBackoff, maxRetries) } } @@ -238,7 +238,7 @@ class KinesisSink private ( log.error( s"Maximum number of retries reached for SQS buffer ${sqs.sqsBufferName} for ${failedRecords.size} records. Retrying in Kinesis" ) - writeBatchToKinesisWithRetries(failedRecords, minBackoff, MaxRetries) + writeBatchToKinesisWithRetries(failedRecords, minBackoff, maxRetries) } def writeBatchToKinesis(batch: List[Events]): Future[PutRecordsResult] = diff --git a/sqs/src/main/resources/application.conf b/sqs/src/main/resources/application.conf index b92c0779f..37f54a280 100644 --- a/sqs/src/main/resources/application.conf +++ b/sqs/src/main/resources/application.conf @@ -10,8 +10,9 @@ collector { } backoffPolicy { - minBackoff = 3000 - maxBackoff = 600000 + minBackoff = 500 + maxBackoff = 1500 + maxRetries = 3 } maxBytes = 192000 diff --git a/sqs/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/SqsSink.scala b/sqs/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/SqsSink.scala index df3ce6c65..538944ea1 100644 --- a/sqs/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/SqsSink.scala +++ b/sqs/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/SqsSink.scala @@ -53,10 +53,10 @@ class SqsSink private ( private val maxBackoff: Long = sqsConfig.backoffPolicy.maxBackoff private val minBackoff: Long = sqsConfig.backoffPolicy.minBackoff + private val maxRetries: Int = sqsConfig.backoffPolicy.maxRetries private val randomGenerator: Random = new java.util.Random() private val MaxSqsBatchSizeN = 10 - private val MaxRetries = 3 // TODO: make this configurable implicit lazy val ec: ExecutionContextExecutorService = concurrent.ExecutionContext.fromExecutorService(executorService) @@ -93,7 +93,7 @@ class SqsSink private ( evts } lastFlushedTime = System.currentTimeMillis() - sinkBatch(eventsToSend, minBackoff, MaxRetries) + sinkBatch(eventsToSend, minBackoff, maxRetries) } def getLastFlushTime: Long = lastFlushedTime @@ -165,7 +165,7 @@ class SqsSink private ( log.error( s"Maximum number of retries reached for SQS queue $queueName for ${failedRecords.size} records" ) - scheduleWrite(failedRecords, maxBackoff, MaxRetries) + scheduleWrite(failedRecords, maxBackoff, maxRetries) } /**