From f61b289337b5fe772d2c909b9e69d8e2bab16c2e Mon Sep 17 00:00:00 2001 From: colmsnowplow Date: Wed, 13 Sep 2023 10:47:34 +0200 Subject: [PATCH] Wrap nsq sink with cats effects (close #348) --- build.sbt | 9 +- nsq/src/main/resources/application.conf | 23 ---- .../NsqCollector.scala | 43 +++---- .../sinks/NsqSink.scala | 64 ++++++++++- .../sinks/NsqSinkConfig.scala | 36 ++++++ .../NsqConfigSpec.scala | 106 +++++++++++++++++- project/Dependencies.scala | 2 + 7 files changed, 228 insertions(+), 55 deletions(-) create mode 100644 nsq/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/NsqSinkConfig.scala diff --git a/build.sbt b/build.sbt index 782591444..965596bda 100644 --- a/build.sbt +++ b/build.sbt @@ -261,12 +261,15 @@ lazy val kafkaDistroless = project .dependsOn(core % "test->test;compile->compile") lazy val nsqSettings = - allSettings ++ buildInfoSettings ++ Seq( + allSettings ++ buildInfoSettings ++ http4sBuildInfoSettings ++ Seq( moduleName := "snowplow-stream-collector-nsq", Docker / packageName := "scala-stream-collector-nsq", + buildInfoPackage := s"com.snowplowanalytics.snowplow.collectors.scalastream", libraryDependencies ++= Seq( + Dependencies.Libraries.catsRetry, Dependencies.Libraries.nsqClient, Dependencies.Libraries.jackson, + Dependencies.Libraries.nettyAll, Dependencies.Libraries.log4j ) ) @@ -274,14 +277,14 @@ lazy val nsqSettings = lazy val nsq = project .settings(nsqSettings) .enablePlugins(JavaAppPackaging, SnowplowDockerPlugin, BuildInfoPlugin) - .dependsOn(core % "test->test;compile->compile") + .dependsOn(http4s % "test->test;compile->compile") lazy val nsqDistroless = project .in(file("distroless/nsq")) .settings(sourceDirectory := (nsq / sourceDirectory).value) .settings(nsqSettings) .enablePlugins(JavaAppPackaging, SnowplowDistrolessDockerPlugin, BuildInfoPlugin) - .dependsOn(core % "test->test;compile->compile") + .dependsOn(http4s % "test->test;compile->compile") lazy val stdoutSettings = allSettings ++ buildInfoSettings ++ http4sBuildInfoSettings ++ Seq( diff --git a/nsq/src/main/resources/application.conf b/nsq/src/main/resources/application.conf index 0d1ae5709..1df27cd22 100644 --- a/nsq/src/main/resources/application.conf +++ b/nsq/src/main/resources/application.conf @@ -14,26 +14,3 @@ collector { } } } - - -akka { - loglevel = WARNING - loggers = ["akka.event.slf4j.Slf4jLogger"] - - http.server { - remote-address-header = on - raw-request-uri-header = on - - parsing { - max-uri-length = 32768 - uri-parsing-mode = relaxed - illegal-header-warnings = off - } - - max-connections = 2048 - } - - coordinated-shutdown { - run-by-jvm-shutdown-hook = off - } -} diff --git a/nsq/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/NsqCollector.scala b/nsq/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/NsqCollector.scala index 44bdd04f0..81969cbb3 100644 --- a/nsq/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/NsqCollector.scala +++ b/nsq/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/NsqCollector.scala @@ -14,28 +14,29 @@ */ package com.snowplowanalytics.snowplow.collectors.scalastream -import com.snowplowanalytics.snowplow.collectors.scalastream.generated.BuildInfo -import com.snowplowanalytics.snowplow.collectors.scalastream.model._ -import com.snowplowanalytics.snowplow.collectors.scalastream.sinks.NsqSink -import com.snowplowanalytics.snowplow.collectors.scalastream.telemetry.TelemetryAkkaService +import java.util.concurrent.ScheduledThreadPoolExecutor -object NsqCollector extends Collector { - def appName = BuildInfo.shortName - def appVersion = BuildInfo.version - def scalaVersion = BuildInfo.scalaVersion +import cats.effect.{IO, Resource} +import com.snowplowanalytics.snowplow.collector.core.model.Sinks +import com.snowplowanalytics.snowplow.collector.core.{App, Config} +import com.snowplowanalytics.snowplow.collectors.scalastream.sinks._ - def main(args: Array[String]): Unit = { - val (collectorConf, akkaConf) = parseConfig(args) - val telemetry = TelemetryAkkaService.initWithCollector(collectorConf, BuildInfo.moduleName, appVersion) - val sinks = { - val goodStream = collectorConf.streams.good - val badStream = collectorConf.streams.bad - val (good, bad) = collectorConf.streams.sink match { - case nc: Nsq => (new NsqSink(nc.maxBytes, nc, goodStream), new NsqSink(nc.maxBytes, nc, badStream)) - case _ => throw new IllegalArgumentException("Configured sink is not NSQ") - } - CollectorSinks(good, bad) - } - run(collectorConf, akkaConf, sinks, telemetry) +object NsqCollector extends App[NsqSinkConfig](BuildInfo) { + override def mkSinks(config: Config.Streams[NsqSinkConfig]): Resource[IO, Sinks[IO]] = { + val threadPoolExecutor = new ScheduledThreadPoolExecutor(config.sink.threadPoolSize) + for { + good <- NsqSink.create[IO]( + config.sink.maxBytes, + config.sink, + config.good, + threadPoolExecutor + ) + bad <- NsqSink.create[IO]( + config.sink.maxBytes, + config.sink, + config.bad, + threadPoolExecutor + ) + } yield Sinks(good, bad) } } diff --git a/nsq/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/NsqSink.scala b/nsq/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/NsqSink.scala index cd466e441..e5474192f 100644 --- a/nsq/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/NsqSink.scala +++ b/nsq/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/NsqSink.scala @@ -19,28 +19,82 @@ package com.snowplowanalytics.snowplow.collectors.scalastream package sinks +import java.util.concurrent.ScheduledExecutorService +import java.util.concurrent.TimeoutException + import scala.collection.JavaConverters._ +import scala.concurrent.ExecutionContextExecutorService +import scala.concurrent.duration.MILLISECONDS + +import cats.effect.{Resource, Sync} import com.snowplowanalytics.client.nsq.NSQProducer -import com.snowplowanalytics.snowplow.collectors.scalastream.model._ +import com.snowplowanalytics.snowplow.collector.core.{Sink} +import com.snowplowanalytics.client.nsq.exceptions.NSQException /** * NSQ Sink for the Scala Stream Collector * @param nsqConfig Configuration for Nsq * @param topicName Nsq topic name */ -class NsqSink(val maxBytes: Int, nsqConfig: Nsq, topicName: String) extends Sink { +class NsqSink[F[_]: Sync] private ( + val maxBytes: Int, + nsqConfig: NsqSinkConfig, + topicName: String, + executorService: ScheduledExecutorService +) extends Sink[F] { + + // private lazy val log = LoggerFactory.getLogger(getClass()) + + implicit lazy val ec: ExecutionContextExecutorService = + concurrent.ExecutionContext.fromExecutorService(executorService) + + var healthStatus = true + + override def isHealthy: F[Boolean] = Sync[F].pure(healthStatus) //TODO: Figure out this implementation private val producer = new NSQProducer().addAddress(nsqConfig.host, nsqConfig.port).start() + def produceData(topicName: String, events: List[Array[Byte]]): Unit = + try { + producer.produceMulti(topicName, events.asJava) + } catch { + case e @ (_: NSQException | _: TimeoutException) => { + healthStatus = false + throw e + } + } + /** * Store raw events to the topic * @param events The list of events to send * @param key The partition key (unused) */ - override def storeRawEvents(events: List[Array[Byte]], key: String): Unit = - producer.produceMulti(topicName, events.asJava) + override def storeRawEvents(events: List[Array[Byte]], key: String): F[Unit] = + Sync[F].delay(produceData(topicName, events)) - override def shutdown(): Unit = + def shutdown(): Unit = producer.shutdown() + executorService.shutdown() + executorService.awaitTermination(10000, MILLISECONDS) + () +} + +object NsqSink { + + def create[F[_]: Sync]( + maxBytes: Int, + nsqConfig: NsqSinkConfig, + // bufferConfig: Config.Buffer, + topicName: String, + executorService: ScheduledExecutorService + ): Resource[F, NsqSink[F]] = { + val acquire = Sync[F].delay( + new NsqSink(maxBytes, nsqConfig, topicName, executorService) + ) + + val release = (sink: NsqSink[F]) => (Sync[F].delay(sink.shutdown())) + + Resource.make(acquire)(release) + } } diff --git a/nsq/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/NsqSinkConfig.scala b/nsq/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/NsqSinkConfig.scala new file mode 100644 index 000000000..def8de5ee --- /dev/null +++ b/nsq/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/NsqSinkConfig.scala @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2013-2022 Snowplow Analytics Ltd. + * All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache + * License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at + * http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. + * + * See the Apache License Version 2.0 for the specific language + * governing permissions and limitations there under. + */ + +package com.snowplowanalytics.snowplow.collectors.scalastream.sinks + +import io.circe.Decoder +import io.circe.generic.semiauto._ + +import com.snowplowanalytics.snowplow.collector.core.Config + +final case class NsqSinkConfig( + maxBytes: Int, + threadPoolSize: Int, + host: String, + port: Int +) extends Config.Sink + +object NsqSinkConfig { + implicit val configDecoder: Decoder[NsqSinkConfig] = deriveDecoder[NsqSinkConfig] +} diff --git a/nsq/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/NsqConfigSpec.scala b/nsq/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/NsqConfigSpec.scala index a70ad4606..1d47cd9c9 100644 --- a/nsq/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/NsqConfigSpec.scala +++ b/nsq/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/NsqConfigSpec.scala @@ -18,8 +18,108 @@ */ package com.snowplowanalytics.snowplow.collectors.scalastream -import com.snowplowanalytics.snowplow.collectors.scalastream.config.ConfigSpec +import cats.effect.testing.specs2.CatsEffect +import cats.effect.{ExitCode, IO} +import com.snowplowanalytics.snowplow.collector.core.{Config, ConfigParser} +import com.snowplowanalytics.snowplow.collectors.scalastream.sinks.NsqSinkConfig +import org.http4s.SameSite +import org.specs2.mutable.Specification -class NsqConfigSpec extends ConfigSpec { - makeConfigTest("nsq", "", "") +import java.nio.file.Paths +import scala.concurrent.duration.DurationInt + +class NsqConfigSpec extends Specification with CatsEffect { + + "Config parser" should { + "be able to parse extended nsq config" in { + assert( + resource = "/config.nsq.extended.hocon", + expectedResult = Right(NsqConfigSpec.expectedConfig) + ) + } + "be able to parse minimal nsq config" in { + assert( + resource = "/config.nsq.minimal.hocon", + expectedResult = Right(NsqConfigSpec.expectedConfig) + ) + } + } + + private def assert(resource: String, expectedResult: Either[ExitCode, Config[NsqSinkConfig]]) = { + val path = Paths.get(getClass.getResource(resource).toURI) + ConfigParser.fromPath[IO, NsqSinkConfig](Some(path)).value.map { result => + result must beEqualTo(expectedResult) + } + } +} + +object NsqConfigSpec { + private val expectedConfig = Config[NsqSinkConfig]( + interface = "0.0.0.0", + port = 8080, + paths = Map.empty[String, String], + p3p = Config.P3P( + policyRef = "/w3c/p3p.xml", + CP = "NOI DSP COR NID PSA OUR IND COM NAV STA" + ), + crossDomain = Config.CrossDomain( + enabled = false, + domains = List("*"), + secure = true + ), + cookie = Config.Cookie( + enabled = true, + expiration = 365.days, + name = "sp", + domains = List.empty, + fallbackDomain = None, + secure = true, + httpOnly = true, + sameSite = Some(SameSite.None) + ), + doNotTrackCookie = Config.DoNotTrackCookie( + enabled = false, + name = "", + value = "" + ), + cookieBounce = Config.CookieBounce( + enabled = false, + name = "n3pc", + fallbackNetworkUserId = "00000000-0000-4000-A000-000000000000", + forwardedProtocolHeader = None + ), + redirectMacro = Config.RedirectMacro( + enabled = false, + placeholder = None + ), + rootResponse = Config.RootResponse( + enabled = false, + statusCode = 302, + headers = Map.empty[String, String], + body = "" + ), + cors = Config.CORS(1.hour), + monitoring = + Config.Monitoring(Config.Metrics(Config.Statsd(false, "localhost", 8125, 10.seconds, "snowplow.collector"))), + ssl = Config.SSL(enable = false, redirect = false, port = 443), + enableDefaultRedirect = false, + redirectDomains = Set.empty, + preTerminationPeriod = 10.seconds, + streams = Config.Streams( + good = "good", + bad = "bad", + useIpAddressAsPartitionKey = false, + buffer = Config.Buffer( + byteLimit = 3145728, + recordLimit = 500, + timeLimit = 5000 + ), + sink = NsqSinkConfig( + maxBytes = 1000000, + threadPoolSize = 10, + host = "nsqHost", + port = 4150 + ) + ) + ) } diff --git a/project/Dependencies.scala b/project/Dependencies.scala index f81ff002b..32d29be7d 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -56,6 +56,7 @@ object Dependencies { val circeConfig = "0.10.0" val fs2PubSub = "0.22.0" val catsRetry = "3.1.0" + val nettyAll = "4.1.95.Final" // to fix nsq dependency // Scala (test only) val specs2 = "4.11.0" @@ -72,6 +73,7 @@ object Dependencies { object Libraries { // Java val jackson = "com.fasterxml.jackson.core" % "jackson-databind" % V.jackson // nsq only + val nettyAll = "io.netty" % "netty-all" % V.nettyAll //nsq only val thrift = "org.apache.thrift" % "libthrift" % V.thrift val kinesis = "com.amazonaws" % "aws-java-sdk-kinesis" % V.awsSdk val sqs = "com.amazonaws" % "aws-java-sdk-sqs" % V.awsSdk