Skip to content

Commit

Permalink
PubSub: add second layer of retry (close #304)
Browse files Browse the repository at this point in the history
  • Loading branch information
benjben committed Apr 4, 2023
1 parent 54d7622 commit 23b187a
Show file tree
Hide file tree
Showing 18 changed files with 355 additions and 159 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright (c) 2023-2023 Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0, and
* you may not use this file except in compliance with the Apache License
* Version 2.0. You may obtain a copy of the Apache License Version 2.0 at
* http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the Apache License Version 2.0 is distributed on an "AS
* IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the Apache License Version 2.0 for the specific language
* governing permissions and limitations there under.
*/
package com.snowplowanalytics.snowplow.collectors.scalastream.it

import org.testcontainers.containers.GenericContainer

case class CollectorContainer(
container: GenericContainer[_],
host: String,
port: Int
)
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,8 @@ package model {
maxBytes: Int,
googleProjectId: String,
backoffPolicy: GooglePubSubBackoffPolicyConfig,
startupCheckInterval: FiniteDuration
startupCheckInterval: FiniteDuration,
retryInterval: FiniteDuration
) extends SinkConfig
final case class Kafka(
maxBytes: Int,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,8 @@ abstract class ConfigSpec extends Specification {
maxRpcTimeout = 10000,
rpcTimeoutMultiplier = 2
),
startupCheckInterval = 1.second
startupCheckInterval = 1.second,
retryInterval = 10.seconds
)
case "sqs" =>
Sqs(
Expand Down
7 changes: 6 additions & 1 deletion examples/config.pubsub.extended.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -202,10 +202,15 @@ collector {
# Default: 10 MB
maxBytes = 10000000

# When collector starts, it checks if PubSub topics exist with listTopics.
# Optional. When collector starts, it checks if PubSub topics exist with listTopics.
# This is the interval for the calls.
# /sink-health is made healthy as soon as requests are successful or records are successfully inserted.
startupCheckInterval = 1 second

# Optional. Collector uses built-in retry mechanism of PubSub API.
# In case of failure of these retries, the events are added to a buffer
# and every retryInterval collector retries to send them.
retryInterval = 10 seconds
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class CookieSpec extends Specification with Localstack with CatsIO {
testName,
streamGood,
streamBad,
Some(
additionalConfig = Some(
mkConfig(
name = name,
expiration = expiration,
Expand Down Expand Up @@ -98,7 +98,7 @@ class CookieSpec extends Specification with Localstack with CatsIO {
testName,
streamGood,
streamBad,
Some(
additionalConfig = Some(
mkConfig(
secure = secure,
httpOnly = httpOnly
Expand Down Expand Up @@ -131,7 +131,7 @@ class CookieSpec extends Specification with Localstack with CatsIO {
testName,
streamGood,
streamBad,
Some(mkConfig())
additionalConfig = Some(mkConfig())
).use { collector =>
val request = EventGenerator.mkTp2Event(collector.host, collector.port)
.withHeaders(Header("SP-Anonymous", "*"))
Expand All @@ -154,7 +154,7 @@ class CookieSpec extends Specification with Localstack with CatsIO {
testName,
streamGood,
streamBad,
Some(mkConfig())
additionalConfig = Some(mkConfig())
).use { collector =>
val request = EventGenerator.mkTp2Event(collector.host, collector.port)
.withHeaders(Header("Origin", "http://my.domain"))
Expand All @@ -181,7 +181,7 @@ class CookieSpec extends Specification with Localstack with CatsIO {
testName,
streamGood,
streamBad,
Some(mkConfig(
additionalConfig = Some(mkConfig(
domains = Some(List(domain, subDomain)),
fallbackDomain = Some(fallbackDomain)
))
Expand Down Expand Up @@ -214,7 +214,7 @@ class CookieSpec extends Specification with Localstack with CatsIO {
testName,
streamGood,
streamBad,
Some(mkConfig(
additionalConfig = Some(mkConfig(
domains = Some(List(domain)),
fallbackDomain = Some(fallbackDomain)
))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class CustomPathsSpec extends Specification with Localstack with CatsIO {
testName,
streamGood,
streamBad,
Some(config)
additionalConfig = Some(config)
).use { collector =>
val requests = originalPaths.map { p =>
val uri = Uri.unsafeFromString(s"http://${collector.host}:${collector.port}$p")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class DoNotTrackCookieSpec extends Specification with Localstack with CatsIO {
testName,
streamGood,
streamBad,
Some(mkConfig(true, cookieName, cookieValue))
additionalConfig = Some(mkConfig(true, cookieName, cookieValue))
).use { collector =>
val requests = List(
EventGenerator.mkTp2Event(collector.host, collector.port).addCookie(cookieName, cookieName),
Expand Down Expand Up @@ -80,7 +80,7 @@ class DoNotTrackCookieSpec extends Specification with Localstack with CatsIO {
testName,
streamGood,
streamBad,
Some(mkConfig(false, cookieName, cookieValue))
additionalConfig = Some(mkConfig(false, cookieName, cookieValue))
).use { collector =>
val request = EventGenerator.mkTp2Event(collector.host, collector.port).addCookie(cookieName, cookieValue)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,12 @@ import com.amazonaws.services.kinesis.model.{GetRecordsRequest, Record}

import com.snowplowanalytics.snowplow.CollectorPayload.thrift.model1.CollectorPayload

import com.snowplowanalytics.snowplow.badrows.BadRow

import com.snowplowanalytics.snowplow.collectors.scalastream.it.CollectorOutput
import com.snowplowanalytics.snowplow.collectors.scalastream.it.utils._

import com.snowplowanalytics.snowplow.collectors.scalastream.it.kinesis.containers.Localstack
import com.snowplowanalytics.snowplow.collectors.scalastream.it.utils._
import com.snowplowanalytics.snowplow.badrows.BadRow

object Kinesis {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@ import cats.effect.IO

import cats.effect.testing.specs2.CatsIO

import org.http4s.{Request, Method, Uri, Status}

import org.specs2.mutable.Specification

import org.testcontainers.containers.GenericContainer

import com.snowplowanalytics.snowplow.collectors.scalastream.it.utils._
import com.snowplowanalytics.snowplow.collectors.scalastream.it.EventGenerator
import com.snowplowanalytics.snowplow.collectors.scalastream.it.{EventGenerator, Http}

import com.snowplowanalytics.snowplow.collectors.scalastream.it.kinesis.containers._

Expand Down Expand Up @@ -101,5 +103,45 @@ class KinesisCollectorSpec extends Specification with Localstack with CatsIO {
}
}
}

"start with /sink-health unhealthy and insert pending events when streams become available" in {
val testName = "sink-health"
val nbGood = 10
val nbBad = 10
val streamGood = s"${testName}-raw"
val streamBad = s"${testName}-bad-1"

Collector.container(
"kinesis/src/it/resources/collector.hocon",
testName,
streamGood,
streamBad,
createStreams = false
).use { collector =>
val uri = Uri.unsafeFromString(s"http://${collector.host}:${collector.port}/sink-health")
val request = Request[IO](Method.GET, uri)

for {
statusBeforeCreate <- Http.status(request)
_ <- EventGenerator.sendEvents(
collector.host,
collector.port,
nbGood,
nbBad,
Collector.maxBytes
)
_ <- Localstack.createStreams(List(streamGood, streamBad))
_ <- IO.sleep(10.second)
statusAfterCreate <- Http.status(request)
collectorOutput <- Kinesis.readOutput(streamGood, streamBad)
_ <- printBadRows(testName, collectorOutput.bad)
} yield {
statusBeforeCreate should beEqualTo(Status.ServiceUnavailable)
statusAfterCreate should beEqualTo(Status.Ok)
collectorOutput.good.size should beEqualTo(nbGood)
collectorOutput.bad.size should beEqualTo(nbBad)
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
*/
package com.snowplowanalytics.snowplow.collectors.scalastream.it.kinesis.containers

import org.testcontainers.containers.{BindMode, GenericContainer => JGenericContainer}
import org.testcontainers.containers.BindMode
import org.testcontainers.containers.wait.strategy.Wait

import com.dimafeng.testcontainers.GenericContainer
Expand All @@ -24,6 +24,7 @@ import cats.effect.{IO, Resource}
import com.snowplowanalytics.snowplow.collectors.scalastream.generated.ProjectMetadata

import com.snowplowanalytics.snowplow.collectors.scalastream.it.utils._
import com.snowplowanalytics.snowplow.collectors.scalastream.it.CollectorContainer

object Collector {

Expand All @@ -35,8 +36,9 @@ object Collector {
testName: String,
streamGood: String,
streamBad: String,
createStreams: Boolean = true,
additionalConfig: Option[String] = None
): Resource[IO, Collector] = {
): Resource[IO, CollectorContainer] = {
val container = GenericContainer(
dockerImage = s"snowplow/scala-stream-collector-kinesis:${ProjectMetadata.dockerTag}",
env = Map(
Expand All @@ -47,7 +49,8 @@ object Collector {
"STREAM_BAD" -> streamBad,
"REGION" -> Localstack.region,
"KINESIS_ENDPOINT" -> Localstack.privateEndpoint,
"MAX_BYTES" -> maxBytes.toString()
"MAX_BYTES" -> maxBytes.toString,
"JDK_JAVA_OPTIONS" -> "-Dorg.slf4j.simpleLogger.log.com.snowplowanalytics.snowplow.collectors.scalastream.sinks.KinesisSink=warn"
) ++ configParameters(additionalConfig),
exposedPorts = Seq(port),
fileSystemBind = Seq(
Expand All @@ -64,10 +67,13 @@ object Collector {
waitStrategy = Wait.forLogMessage(s".*REST interface bound to.*", 1)
)
container.container.withNetwork(Localstack.network)
Resource.make (
Localstack.createStreams(List(streamGood, streamBad)) *>

val create = if(createStreams) Localstack.createStreams(List(streamGood, streamBad)) else IO.unit

Resource.make(
create *>
IO(startContainerWithLogs(container.container, testName))
.map(c => Collector(c, c.getHost, c.getMappedPort(Collector.port)))
.map(c => CollectorContainer(c, c.getHost, c.getMappedPort(Collector.port)))
)(
c => IO(c.container.stop())
)
Expand All @@ -81,9 +87,3 @@ object Collector {
Map("JDK_JAVA_OPTIONS" -> fields)
}
}

case class Collector(
container: JGenericContainer[_],
host: String,
port: Int
)
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ class KinesisSink private (
}

private def checkKinesisHealth(): Unit = {
val healthThread = new Runnable {
val healthRunnable = new Runnable {
override def run() {
while (!kinesisHealthy) {
Try {
Expand All @@ -373,11 +373,11 @@ class KinesisSink private (
}
}
}
executorService.execute(healthThread)
executorService.execute(healthRunnable)
}

private def checkSqsHealth(): Unit = maybeSqs.foreach { sqs =>
val healthThread = new Runnable {
val healthRunnable = new Runnable {
override def run() {
while (!sqsHealthy) {
Try {
Expand All @@ -393,7 +393,7 @@ class KinesisSink private (
}
}
}
executorService.execute(healthThread)
executorService.execute(healthRunnable)
}
}

Expand Down
14 changes: 14 additions & 0 deletions pubsub/src/it/resources/collector.hocon
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
collector {
interface = "0.0.0.0"
port = ${PORT}

streams {
good = ${TOPIC_GOOD}
bad = ${TOPIC_BAD}

sink {
googleProjectId = ${GOOGLE_PROJECT_ID}
maxBytes = ${MAX_BYTES}
}
}
}
Loading

0 comments on commit 23b187a

Please sign in to comment.