Skip to content

Commit

Permalink
Make MaxRetries configurable (close #295)
Browse files Browse the repository at this point in the history
  • Loading branch information
benjben committed Mar 28, 2023
1 parent 345c081 commit acfc82a
Show file tree
Hide file tree
Showing 9 changed files with 42 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ package model {
final case class P3PConfig(policyRef: String, CP: String)
final case class CrossDomainConfig(enabled: Boolean, domains: List[String], secure: Boolean)
final case class CORSConfig(accessControlMaxAge: FiniteDuration)
final case class KinesisBackoffPolicyConfig(minBackoff: Long, maxBackoff: Long)
final case class SqsBackoffPolicyConfig(minBackoff: Long, maxBackoff: Long)
final case class KinesisBackoffPolicyConfig(minBackoff: Long, maxBackoff: Long, maxRetries: Int)
final case class SqsBackoffPolicyConfig(minBackoff: Long, maxBackoff: Long, maxRetries: Int)
final case class GooglePubSubBackoffPolicyConfig(
minBackoff: Long,
maxBackoff: Long,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ object TestUtils {
region = "us-east-1",
threadPoolSize = 12,
aws = AWSConfig("cpf", "cpf"),
backoffPolicy = KinesisBackoffPolicyConfig(3000L, 60000L),
backoffPolicy = KinesisBackoffPolicyConfig(500L, 1500L, 3),
customEndpoint = None,
sqsGoodBuffer = Some("good-buffer"),
sqsBadBuffer = Some("bad-buffer"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,9 @@ abstract class ConfigSpec extends Specification {
secretKey = "iam"
),
backoffPolicy = SqsBackoffPolicyConfig(
minBackoff = 3000,
maxBackoff = 600000
minBackoff = 500,
maxBackoff = 1500,
maxRetries = 3
)
)
case "stdout" => Stdout(maxBytes = 1000000000)
Expand All @@ -145,8 +146,9 @@ abstract class ConfigSpec extends Specification {
secretKey = "iam"
),
backoffPolicy = KinesisBackoffPolicyConfig(
minBackoff = 3000,
maxBackoff = 600000
minBackoff = 500,
maxBackoff = 1500,
maxRetries = 3
),
sqsBadBuffer = None,
sqsGoodBuffer = None,
Expand Down
12 changes: 9 additions & 3 deletions examples/config.kinesis.extended.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -219,10 +219,16 @@ collector {
secretKey = iam
}

# Minimum and maximum backoff periods, in milliseconds
# Optional
backoffPolicy {
minBackoff = 3000
maxBackoff = 600000
# Minimum backoff period in milliseconds
minBackoff = 500
# Maximum backoff period in milliseconds
maxBackoff = 1500
# Failed inserts are retried forever.
# In case of just Kinesis without SQS, number of retries before setting /sink-health unhealthy.
# In case of Kinesis + SQS, number of retries with one before retrying with the other.
maxRetries = 3
}

# Optional. Maximum number of bytes that a single record can contain.
Expand Down
11 changes: 8 additions & 3 deletions examples/config.sqs.extended.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -202,10 +202,15 @@ collector {
secretKey = ${?COLLECTOR_STREAMS_SINK_AWS_SECRET_KEY}
}

# Minimum and maximum backoff periods, in milliseconds
# Optional
backoffPolicy {
minBackoff = 3000
maxBackoff = 600000
# Minimum backoff period in milliseconds
minBackoff = 500
# Maximum backoff period in milliseconds
maxBackoff = 1500
# Failed inserts are retried forever.
# Number of retries before setting /sink-health unhealthy.
maxRetries = 3
}

# Optional. Maximum number of bytes that a single record can contain.
Expand Down
5 changes: 3 additions & 2 deletions kinesis/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ collector {
}

backoffPolicy {
minBackoff = 3000
maxBackoff = 600000
minBackoff = 500
maxBackoff = 1500
maxRetries = 3
}

maxBytes = 1000000
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,10 @@ class KinesisSink private (

private val maxBackoff = kinesisConfig.backoffPolicy.maxBackoff
private val minBackoff = kinesisConfig.backoffPolicy.minBackoff
private val maxRetries = kinesisConfig.backoffPolicy.maxRetries
private val randomGenerator = new java.util.Random()

private val MaxSqsBatchSizeN = 10
private val MaxRetries = 3 // TODO: make this configurable

implicit lazy val ec: ExecutionContextExecutorService =
concurrent.ExecutionContext.fromExecutorService(executorService)
Expand Down Expand Up @@ -138,13 +138,13 @@ class KinesisSink private (
if (batch.nonEmpty) maybeSqs match {
// Kinesis healthy
case _ if kinesisHealhy =>
writeBatchToKinesisWithRetries(batch, minBackoff, MaxRetries)
writeBatchToKinesisWithRetries(batch, minBackoff, maxRetries)
// No SQS buffer
case None =>
writeBatchToKinesisWithRetries(batch, minBackoff, MaxRetries)
writeBatchToKinesisWithRetries(batch, minBackoff, maxRetries)
// Kinesis not healthy and SQS buffer defined
case Some(sqs) =>
writeBatchToSqsWithRetries(batch, sqs, minBackoff, MaxRetries)
writeBatchToSqsWithRetries(batch, sqs, minBackoff, maxRetries)
}

def writeBatchToKinesisWithRetries(batch: List[Events], nextBackoff: Long, retriesLeft: Int): Unit = {
Expand Down Expand Up @@ -220,10 +220,10 @@ class KinesisSink private (
log.error(
s"SQS buffer ${sqs.sqsBufferName} defined for stream $streamName. Retrying to send the events to SQS"
)
writeBatchToSqsWithRetries(failedRecords, sqs, minBackoff, MaxRetries)
writeBatchToSqsWithRetries(failedRecords, sqs, minBackoff, maxRetries)
case None =>
log.error(s"No SQS buffer defined for stream $streamName. Retrying to send the events to Kinesis")
writeBatchToKinesisWithRetries(failedRecords, maxBackoff, MaxRetries)
writeBatchToKinesisWithRetries(failedRecords, maxBackoff, maxRetries)
}
}

Expand All @@ -238,7 +238,7 @@ class KinesisSink private (
log.error(
s"Maximum number of retries reached for SQS buffer ${sqs.sqsBufferName} for ${failedRecords.size} records. Retrying in Kinesis"
)
writeBatchToKinesisWithRetries(failedRecords, minBackoff, MaxRetries)
writeBatchToKinesisWithRetries(failedRecords, minBackoff, maxRetries)
}

def writeBatchToKinesis(batch: List[Events]): Future[PutRecordsResult] =
Expand Down
5 changes: 3 additions & 2 deletions sqs/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ collector {
}

backoffPolicy {
minBackoff = 3000
maxBackoff = 600000
minBackoff = 500
maxBackoff = 1500
maxRetries = 3
}

maxBytes = 192000
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,10 @@ class SqsSink private (

private val maxBackoff: Long = sqsConfig.backoffPolicy.maxBackoff
private val minBackoff: Long = sqsConfig.backoffPolicy.minBackoff
private val maxRetries: Int = sqsConfig.backoffPolicy.maxRetries
private val randomGenerator: Random = new java.util.Random()

private val MaxSqsBatchSizeN = 10
private val MaxRetries = 3 // TODO: make this configurable

implicit lazy val ec: ExecutionContextExecutorService =
concurrent.ExecutionContext.fromExecutorService(executorService)
Expand Down Expand Up @@ -93,7 +93,7 @@ class SqsSink private (
evts
}
lastFlushedTime = System.currentTimeMillis()
sinkBatch(eventsToSend, minBackoff, MaxRetries)
sinkBatch(eventsToSend, minBackoff, maxRetries)
}

def getLastFlushTime: Long = lastFlushedTime
Expand Down Expand Up @@ -165,7 +165,7 @@ class SqsSink private (
log.error(
s"Maximum number of retries reached for SQS queue $queueName for ${failedRecords.size} records"
)
scheduleWrite(failedRecords, maxBackoff, MaxRetries)
scheduleWrite(failedRecords, maxBackoff, maxRetries)
}

/**
Expand Down

0 comments on commit acfc82a

Please sign in to comment.