Skip to content

Commit

Permalink
Wrap nsq sink with cats effects (close #348)
Browse files Browse the repository at this point in the history
  • Loading branch information
colmsnowplow committed Sep 13, 2023
1 parent b8f8ff8 commit f61b289
Show file tree
Hide file tree
Showing 7 changed files with 228 additions and 55 deletions.
9 changes: 6 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -261,27 +261,30 @@ lazy val kafkaDistroless = project
.dependsOn(core % "test->test;compile->compile")

lazy val nsqSettings =
allSettings ++ buildInfoSettings ++ Seq(
allSettings ++ buildInfoSettings ++ http4sBuildInfoSettings ++ Seq(
moduleName := "snowplow-stream-collector-nsq",
Docker / packageName := "scala-stream-collector-nsq",
buildInfoPackage := s"com.snowplowanalytics.snowplow.collectors.scalastream",
libraryDependencies ++= Seq(
Dependencies.Libraries.catsRetry,
Dependencies.Libraries.nsqClient,
Dependencies.Libraries.jackson,
Dependencies.Libraries.nettyAll,
Dependencies.Libraries.log4j
)
)

lazy val nsq = project
.settings(nsqSettings)
.enablePlugins(JavaAppPackaging, SnowplowDockerPlugin, BuildInfoPlugin)
.dependsOn(core % "test->test;compile->compile")
.dependsOn(http4s % "test->test;compile->compile")

lazy val nsqDistroless = project
.in(file("distroless/nsq"))
.settings(sourceDirectory := (nsq / sourceDirectory).value)
.settings(nsqSettings)
.enablePlugins(JavaAppPackaging, SnowplowDistrolessDockerPlugin, BuildInfoPlugin)
.dependsOn(core % "test->test;compile->compile")
.dependsOn(http4s % "test->test;compile->compile")

lazy val stdoutSettings =
allSettings ++ buildInfoSettings ++ http4sBuildInfoSettings ++ Seq(
Expand Down
23 changes: 0 additions & 23 deletions nsq/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -14,26 +14,3 @@ collector {
}
}
}


akka {
loglevel = WARNING
loggers = ["akka.event.slf4j.Slf4jLogger"]

http.server {
remote-address-header = on
raw-request-uri-header = on

parsing {
max-uri-length = 32768
uri-parsing-mode = relaxed
illegal-header-warnings = off
}

max-connections = 2048
}

coordinated-shutdown {
run-by-jvm-shutdown-hook = off
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,28 +14,29 @@
*/
package com.snowplowanalytics.snowplow.collectors.scalastream

import com.snowplowanalytics.snowplow.collectors.scalastream.generated.BuildInfo
import com.snowplowanalytics.snowplow.collectors.scalastream.model._
import com.snowplowanalytics.snowplow.collectors.scalastream.sinks.NsqSink
import com.snowplowanalytics.snowplow.collectors.scalastream.telemetry.TelemetryAkkaService
import java.util.concurrent.ScheduledThreadPoolExecutor

object NsqCollector extends Collector {
def appName = BuildInfo.shortName
def appVersion = BuildInfo.version
def scalaVersion = BuildInfo.scalaVersion
import cats.effect.{IO, Resource}
import com.snowplowanalytics.snowplow.collector.core.model.Sinks
import com.snowplowanalytics.snowplow.collector.core.{App, Config}
import com.snowplowanalytics.snowplow.collectors.scalastream.sinks._

def main(args: Array[String]): Unit = {
val (collectorConf, akkaConf) = parseConfig(args)
val telemetry = TelemetryAkkaService.initWithCollector(collectorConf, BuildInfo.moduleName, appVersion)
val sinks = {
val goodStream = collectorConf.streams.good
val badStream = collectorConf.streams.bad
val (good, bad) = collectorConf.streams.sink match {
case nc: Nsq => (new NsqSink(nc.maxBytes, nc, goodStream), new NsqSink(nc.maxBytes, nc, badStream))
case _ => throw new IllegalArgumentException("Configured sink is not NSQ")
}
CollectorSinks(good, bad)
}
run(collectorConf, akkaConf, sinks, telemetry)
object NsqCollector extends App[NsqSinkConfig](BuildInfo) {
override def mkSinks(config: Config.Streams[NsqSinkConfig]): Resource[IO, Sinks[IO]] = {
val threadPoolExecutor = new ScheduledThreadPoolExecutor(config.sink.threadPoolSize)
for {
good <- NsqSink.create[IO](
config.sink.maxBytes,
config.sink,
config.good,
threadPoolExecutor
)
bad <- NsqSink.create[IO](
config.sink.maxBytes,
config.sink,
config.bad,
threadPoolExecutor
)
} yield Sinks(good, bad)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,28 +19,82 @@
package com.snowplowanalytics.snowplow.collectors.scalastream
package sinks

import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.TimeoutException

import scala.collection.JavaConverters._
import scala.concurrent.ExecutionContextExecutorService
import scala.concurrent.duration.MILLISECONDS

import cats.effect.{Resource, Sync}

import com.snowplowanalytics.client.nsq.NSQProducer
import com.snowplowanalytics.snowplow.collectors.scalastream.model._
import com.snowplowanalytics.snowplow.collector.core.{Sink}
import com.snowplowanalytics.client.nsq.exceptions.NSQException

/**
* NSQ Sink for the Scala Stream Collector
* @param nsqConfig Configuration for Nsq
* @param topicName Nsq topic name
*/
class NsqSink(val maxBytes: Int, nsqConfig: Nsq, topicName: String) extends Sink {
class NsqSink[F[_]: Sync] private (
val maxBytes: Int,
nsqConfig: NsqSinkConfig,
topicName: String,
executorService: ScheduledExecutorService
) extends Sink[F] {

// private lazy val log = LoggerFactory.getLogger(getClass())

implicit lazy val ec: ExecutionContextExecutorService =
concurrent.ExecutionContext.fromExecutorService(executorService)

var healthStatus = true

override def isHealthy: F[Boolean] = Sync[F].pure(healthStatus) //TODO: Figure out this implementation

private val producer = new NSQProducer().addAddress(nsqConfig.host, nsqConfig.port).start()

def produceData(topicName: String, events: List[Array[Byte]]): Unit =
try {
producer.produceMulti(topicName, events.asJava)
} catch {
case e @ (_: NSQException | _: TimeoutException) => {
healthStatus = false
throw e
}
}

/**
* Store raw events to the topic
* @param events The list of events to send
* @param key The partition key (unused)
*/
override def storeRawEvents(events: List[Array[Byte]], key: String): Unit =
producer.produceMulti(topicName, events.asJava)
override def storeRawEvents(events: List[Array[Byte]], key: String): F[Unit] =
Sync[F].delay(produceData(topicName, events))

override def shutdown(): Unit =
def shutdown(): Unit =
producer.shutdown()
executorService.shutdown()
executorService.awaitTermination(10000, MILLISECONDS)
()
}

object NsqSink {

def create[F[_]: Sync](
maxBytes: Int,
nsqConfig: NsqSinkConfig,
// bufferConfig: Config.Buffer,
topicName: String,
executorService: ScheduledExecutorService
): Resource[F, NsqSink[F]] = {
val acquire = Sync[F].delay(
new NsqSink(maxBytes, nsqConfig, topicName, executorService)
)

val release = (sink: NsqSink[F]) => (Sync[F].delay(sink.shutdown()))

Resource.make(acquire)(release)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright (c) 2013-2022 Snowplow Analytics Ltd.
* All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
* and you may not use this file except in compliance with the Apache
* License Version 2.0.
* You may obtain a copy of the Apache License Version 2.0 at
* http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the Apache License Version 2.0 is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied.
*
* See the Apache License Version 2.0 for the specific language
* governing permissions and limitations there under.
*/

package com.snowplowanalytics.snowplow.collectors.scalastream.sinks

import io.circe.Decoder
import io.circe.generic.semiauto._

import com.snowplowanalytics.snowplow.collector.core.Config

final case class NsqSinkConfig(
maxBytes: Int,
threadPoolSize: Int,
host: String,
port: Int
) extends Config.Sink

object NsqSinkConfig {
implicit val configDecoder: Decoder[NsqSinkConfig] = deriveDecoder[NsqSinkConfig]
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,108 @@
*/
package com.snowplowanalytics.snowplow.collectors.scalastream

import com.snowplowanalytics.snowplow.collectors.scalastream.config.ConfigSpec
import cats.effect.testing.specs2.CatsEffect
import cats.effect.{ExitCode, IO}
import com.snowplowanalytics.snowplow.collector.core.{Config, ConfigParser}
import com.snowplowanalytics.snowplow.collectors.scalastream.sinks.NsqSinkConfig
import org.http4s.SameSite
import org.specs2.mutable.Specification

class NsqConfigSpec extends ConfigSpec {
makeConfigTest("nsq", "", "")
import java.nio.file.Paths
import scala.concurrent.duration.DurationInt

class NsqConfigSpec extends Specification with CatsEffect {

"Config parser" should {
"be able to parse extended nsq config" in {
assert(
resource = "/config.nsq.extended.hocon",
expectedResult = Right(NsqConfigSpec.expectedConfig)
)
}
"be able to parse minimal nsq config" in {
assert(
resource = "/config.nsq.minimal.hocon",
expectedResult = Right(NsqConfigSpec.expectedConfig)
)
}
}

private def assert(resource: String, expectedResult: Either[ExitCode, Config[NsqSinkConfig]]) = {
val path = Paths.get(getClass.getResource(resource).toURI)
ConfigParser.fromPath[IO, NsqSinkConfig](Some(path)).value.map { result =>
result must beEqualTo(expectedResult)
}
}
}

object NsqConfigSpec {
private val expectedConfig = Config[NsqSinkConfig](
interface = "0.0.0.0",
port = 8080,
paths = Map.empty[String, String],
p3p = Config.P3P(
policyRef = "/w3c/p3p.xml",
CP = "NOI DSP COR NID PSA OUR IND COM NAV STA"
),
crossDomain = Config.CrossDomain(
enabled = false,
domains = List("*"),
secure = true
),
cookie = Config.Cookie(
enabled = true,
expiration = 365.days,
name = "sp",
domains = List.empty,
fallbackDomain = None,
secure = true,
httpOnly = true,
sameSite = Some(SameSite.None)
),
doNotTrackCookie = Config.DoNotTrackCookie(
enabled = false,
name = "",
value = ""
),
cookieBounce = Config.CookieBounce(
enabled = false,
name = "n3pc",
fallbackNetworkUserId = "00000000-0000-4000-A000-000000000000",
forwardedProtocolHeader = None
),
redirectMacro = Config.RedirectMacro(
enabled = false,
placeholder = None
),
rootResponse = Config.RootResponse(
enabled = false,
statusCode = 302,
headers = Map.empty[String, String],
body = ""
),
cors = Config.CORS(1.hour),
monitoring =
Config.Monitoring(Config.Metrics(Config.Statsd(false, "localhost", 8125, 10.seconds, "snowplow.collector"))),
ssl = Config.SSL(enable = false, redirect = false, port = 443),
enableDefaultRedirect = false,
redirectDomains = Set.empty,
preTerminationPeriod = 10.seconds,
streams = Config.Streams(
good = "good",
bad = "bad",
useIpAddressAsPartitionKey = false,
buffer = Config.Buffer(
byteLimit = 3145728,
recordLimit = 500,
timeLimit = 5000
),
sink = NsqSinkConfig(
maxBytes = 1000000,
threadPoolSize = 10,
host = "nsqHost",
port = 4150
)
)
)
}
2 changes: 2 additions & 0 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ object Dependencies {
val circeConfig = "0.10.0"
val fs2PubSub = "0.22.0"
val catsRetry = "3.1.0"
val nettyAll = "4.1.95.Final" // to fix nsq dependency

// Scala (test only)
val specs2 = "4.11.0"
Expand All @@ -72,6 +73,7 @@ object Dependencies {
object Libraries {
// Java
val jackson = "com.fasterxml.jackson.core" % "jackson-databind" % V.jackson // nsq only
val nettyAll = "io.netty" % "netty-all" % V.nettyAll //nsq only
val thrift = "org.apache.thrift" % "libthrift" % V.thrift
val kinesis = "com.amazonaws" % "aws-java-sdk-kinesis" % V.awsSdk
val sqs = "com.amazonaws" % "aws-java-sdk-sqs" % V.awsSdk
Expand Down

0 comments on commit f61b289

Please sign in to comment.