From 7fb51e3ee5ae97ffa6e6a8f55915246aee159ad0 Mon Sep 17 00:00:00 2001 From: spenes Date: Tue, 19 Sep 2023 16:46:22 +0300 Subject: [PATCH] Add telemetry support (close #381) --- build.sbt | 15 ++- examples/config.kinesis.extended.hocon | 7 ++ http4s/src/main/resources/reference.conf | 9 ++ .../App.scala | 6 +- .../AppInfo.scala | 1 + .../Config.scala | 21 +++- .../Run.scala | 42 +++++-- .../Telemetry.scala | 118 ++++++++++++++++++ .../TestUtils.scala | 16 ++- .../KinesisCollector.scala | 8 +- .../sinks/KinesisConfigSpec.scala | 32 ++++- project/Dependencies.scala | 41 +++--- .../PubSubCollector.scala | 8 +- .../ConfigSpec.scala | 13 ++ .../SqsCollector.scala | 10 +- .../SqsConfigSpec.scala | 13 ++ .../StdoutCollector.scala | 6 +- 17 files changed, 318 insertions(+), 48 deletions(-) create mode 100644 http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Telemetry.scala diff --git a/build.sbt b/build.sbt index cc8ae3d93..9ca57ab4e 100644 --- a/build.sbt +++ b/build.sbt @@ -27,17 +27,17 @@ lazy val commonDependencies = Seq( Dependencies.Libraries.badRows, Dependencies.Libraries.collectorPayload, Dependencies.Libraries.pureconfig, - Dependencies.Libraries.trackerCore, - Dependencies.Libraries.trackerEmitterId, + Dependencies.Libraries.Legacy.trackerCore, + Dependencies.Libraries.Legacy.trackerEmitterId, // Unit tests Dependencies.Libraries.akkaTestkit, Dependencies.Libraries.akkaHttpTestkit, Dependencies.Libraries.akkaStreamTestkit, Dependencies.Libraries.specs2, // Integration tests - Dependencies.Libraries.LegacyIT.testcontainers, - Dependencies.Libraries.LegacyIT.http4sClient, - Dependencies.Libraries.LegacyIT.catsRetry + Dependencies.Libraries.Legacy.testcontainers, + Dependencies.Libraries.Legacy.http4sClient, + Dependencies.Libraries.Legacy.catsRetry ) lazy val commonExclusions = Seq( @@ -95,7 +95,7 @@ lazy val dynVerSettings = Seq( ) lazy val http4sBuildInfoSettings = Seq( - buildInfoKeys := Seq[BuildInfoKey](name, dockerAlias, version), + buildInfoKeys := Seq[BuildInfoKey](name, moduleName, dockerAlias, version), buildInfoOptions += BuildInfoOption.Traits("com.snowplowanalytics.snowplow.collector.core.AppInfo") ) @@ -128,6 +128,7 @@ lazy val http4s = project Dependencies.Libraries.http4sEmber, Dependencies.Libraries.http4sBlaze, Dependencies.Libraries.http4sNetty, + Dependencies.Libraries.http4sClient, Dependencies.Libraries.log4cats, Dependencies.Libraries.thrift, Dependencies.Libraries.badRows, @@ -136,6 +137,8 @@ lazy val http4s = project Dependencies.Libraries.decline, Dependencies.Libraries.circeGeneric, Dependencies.Libraries.circeConfig, + Dependencies.Libraries.trackerCore, + Dependencies.Libraries.emitterHttps, Dependencies.Libraries.specs2, Dependencies.Libraries.specs2CE, diff --git a/examples/config.kinesis.extended.hocon b/examples/config.kinesis.extended.hocon index 9a54621ee..28e20fdee 100644 --- a/examples/config.kinesis.extended.hocon +++ b/examples/config.kinesis.extended.hocon @@ -270,6 +270,13 @@ collector { url = telemetry-g.snowplowanalytics.com port = 443 secure = true + + # Identifiers used by collector terraform module + userProvidedId = my_pipeline, + moduleName = collector-kinesis-ec2, + moduleVersion = 0.5.2, + instanceId = 665bhft5u6udjf, + autoGeneratedId = hfy67e5ydhtrd } monitoring.metrics.statsd { diff --git a/http4s/src/main/resources/reference.conf b/http4s/src/main/resources/reference.conf index 9bee1be0c..9ae8f6849 100644 --- a/http4s/src/main/resources/reference.conf +++ b/http4s/src/main/resources/reference.conf @@ -59,6 +59,15 @@ } } + telemetry { + disable = false + interval = 60 minutes + method = POST + url = telemetry-g.snowplowanalytics.com + port = 443 + secure = true + } + monitoring { metrics { statsd { diff --git a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/App.scala b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/App.scala index cb69be2f9..5bbce5762 100644 --- a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/App.scala +++ b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/App.scala @@ -8,6 +8,8 @@ import com.monovore.decline.Opts import io.circe.Decoder +import com.snowplowanalytics.snowplow.scalatracker.emitters.http4s.ceTracking + import com.snowplowanalytics.snowplow.collector.core.model.Sinks abstract class App[SinkConfig <: Config.Sink: Decoder](appInfo: AppInfo) @@ -19,7 +21,9 @@ abstract class App[SinkConfig <: Config.Sink: Decoder](appInfo: AppInfo) def mkSinks(config: Config.Streams[SinkConfig]): Resource[IO, Sinks[IO]] - final def main: Opts[IO[ExitCode]] = Run.fromCli[IO, SinkConfig](appInfo, mkSinks) + def telemetryInfo(config: Config[SinkConfig]): Telemetry.TelemetryInfo + + final def main: Opts[IO[ExitCode]] = Run.fromCli[IO, SinkConfig](appInfo, mkSinks, telemetryInfo) } object App { diff --git a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/AppInfo.scala b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/AppInfo.scala index 1215a8149..837252b72 100644 --- a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/AppInfo.scala +++ b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/AppInfo.scala @@ -2,6 +2,7 @@ package com.snowplowanalytics.snowplow.collector.core trait AppInfo { def name: String + def moduleName: String def version: String def dockerAlias: String } diff --git a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Config.scala b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Config.scala index f274ccf1c..62e4c0d07 100644 --- a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Config.scala +++ b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Config.scala @@ -1,6 +1,6 @@ package com.snowplowanalytics.snowplow.collector.core -import scala.concurrent.duration.FiniteDuration +import scala.concurrent.duration._ import io.circe.config.syntax._ import io.circe.generic.semiauto._ @@ -23,6 +23,7 @@ case class Config[+SinkConfig]( cors: Config.CORS, streams: Config.Streams[SinkConfig], monitoring: Config.Monitoring, + telemetry: Config.Telemetry, ssl: Config.SSL, enableDefaultRedirect: Boolean, redirectDomains: Set[String], @@ -122,6 +123,23 @@ object Config { port: Int ) + final case class Telemetry( + // General params + disable: Boolean, + interval: FiniteDuration, + // http params + method: String, + url: String, + port: Int, + secure: Boolean, + // Params injected by deployment scripts + userProvidedId: Option[String], + moduleName: Option[String], + moduleVersion: Option[String], + instanceId: Option[String], + autoGeneratedId: Option[String] + ) + implicit def decoder[SinkConfig: Decoder]: Decoder[Config[SinkConfig]] = { implicit val p3p = deriveDecoder[P3P] implicit val crossDomain = deriveDecoder[CrossDomain] @@ -147,6 +165,7 @@ object Config { implicit val metrics = deriveDecoder[Metrics] implicit val monitoring = deriveDecoder[Monitoring] implicit val ssl = deriveDecoder[SSL] + implicit val telemetry = deriveDecoder[Telemetry] deriveDecoder[Config[SinkConfig]] } diff --git a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Run.scala b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Run.scala index a221fdfec..92839bac0 100644 --- a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Run.scala +++ b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Run.scala @@ -13,32 +13,38 @@ import cats.data.EitherT import cats.effect.{Async, ExitCode, Sync} import cats.effect.kernel.Resource +import org.http4s.blaze.client.BlazeClientBuilder + import com.monovore.decline.Opts import io.circe.Decoder +import com.snowplowanalytics.snowplow.scalatracker.Tracking + import com.snowplowanalytics.snowplow.collector.core.model.Sinks object Run { implicit private def logger[F[_]: Sync] = Slf4jLogger.getLogger[F] - def fromCli[F[_]: Async, SinkConfig: Decoder]( + def fromCli[F[_]: Async: Tracking, SinkConfig: Decoder]( appInfo: AppInfo, - mkSinks: Config.Streams[SinkConfig] => Resource[F, Sinks[F]] + mkSinks: Config.Streams[SinkConfig] => Resource[F, Sinks[F]], + telemetryInfo: Config[SinkConfig] => Telemetry.TelemetryInfo ): Opts[F[ExitCode]] = { val configPath = Opts.option[Path]("config", "Path to HOCON configuration (optional)", "c", "config.hocon").orNone - configPath.map(fromPath[F, SinkConfig](appInfo, mkSinks, _)) + configPath.map(fromPath[F, SinkConfig](appInfo, mkSinks, telemetryInfo, _)) } - private def fromPath[F[_]: Async, SinkConfig: Decoder]( + private def fromPath[F[_]: Async: Tracking, SinkConfig: Decoder]( appInfo: AppInfo, mkSinks: Config.Streams[SinkConfig] => Resource[F, Sinks[F]], + telemetryInfo: Config[SinkConfig] => Telemetry.TelemetryInfo, path: Option[Path] ): F[ExitCode] = { val eitherT = for { config <- ConfigParser.fromPath[F, SinkConfig](path) - _ <- EitherT.right[ExitCode](fromConfig(appInfo, mkSinks, config)) + _ <- EitherT.right[ExitCode](fromConfig(appInfo, mkSinks, config, telemetryInfo)) } yield ExitCode.Success eitherT.merge.handleErrorWith { e => @@ -47,10 +53,11 @@ object Run { } } - private def fromConfig[F[_]: Async, SinkConfig]( + private def fromConfig[F[_]: Async: Tracking, SinkConfig]( appInfo: AppInfo, mkSinks: Config.Streams[SinkConfig] => Resource[F, Sinks[F]], - config: Config[SinkConfig] + config: Config[SinkConfig], + telemetryInfo: Config[SinkConfig] => Telemetry.TelemetryInfo ): F[ExitCode] = { val resources = for { sinks <- mkSinks(config.streams) @@ -65,10 +72,23 @@ object Run { if (config.ssl.enable) config.ssl.port else config.port, config.ssl.enable ) - _ <- withGracefulShutdown(config.preTerminationPeriod)(httpServer) - } yield () - - resources.surround(Async[F].never[ExitCode]) + _ <- withGracefulShutdown(config.preTerminationPeriod)(httpServer) + httpClient <- BlazeClientBuilder[F].resource + } yield httpClient + + resources.use { httpClient => + Telemetry + .run( + config.telemetry, + httpClient, + appInfo, + telemetryInfo(config).region, + telemetryInfo(config).cloud + ) + .compile + .drain + .flatMap(_ => Async[F].never[ExitCode]) + } } private def prettyLogException[F[_]: Sync](e: Throwable): F[Unit] = { diff --git a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Telemetry.scala b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Telemetry.scala new file mode 100644 index 000000000..95df9bebc --- /dev/null +++ b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Telemetry.scala @@ -0,0 +1,118 @@ +package com.snowplowanalytics.snowplow.collector.core + +import org.typelevel.log4cats.Logger +import org.typelevel.log4cats.slf4j.Slf4jLogger + +import cats.data.NonEmptyList +import cats.implicits._ + +import cats.effect.{Async, Resource, Sync} +import cats.effect.std.Random + +import fs2.Stream + +import org.http4s.client.{Client => HttpClient} + +import _root_.io.circe.Json +import _root_.io.circe.syntax._ + +import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer, SelfDescribingData} + +import com.snowplowanalytics.snowplow.scalatracker.{Tracker, Tracking} +import com.snowplowanalytics.snowplow.scalatracker.Emitter._ +import com.snowplowanalytics.snowplow.scalatracker.Emitter.{Result => TrackerResult} +import com.snowplowanalytics.snowplow.scalatracker.emitters.http4s.Http4sEmitter + +object Telemetry { + + implicit private def unsafeLogger[F[_]: Sync]: Logger[F] = + Slf4jLogger.getLogger[F] + + def run[F[_]: Async: Tracking]( + telemetryConfig: Config.Telemetry, + httpClient: HttpClient[F], + appInfo: AppInfo, + region: Option[String], + cloud: Option[String] + ): Stream[F, Unit] = + if (telemetryConfig.disable) + Stream.empty.covary[F] + else { + val sdj = makeHeartbeatEvent( + telemetryConfig, + region, + cloud, + appInfo.moduleName, + appInfo.version + ) + Stream.resource(initTracker(telemetryConfig, appInfo.moduleName, httpClient)).flatMap { tracker => + Stream.fixedDelay[F](telemetryConfig.interval).evalMap { _ => + tracker.trackSelfDescribingEvent(unstructEvent = sdj) >> tracker.flushEmitters() + } + } + } + + private def initTracker[F[_]: Async: Tracking]( + config: Config.Telemetry, + appName: String, + client: HttpClient[F] + ): Resource[F, Tracker[F]] = + for { + random <- Resource.eval(Random.scalaUtilRandom[F]) + emitter <- { + implicit val r: Random[F] = random + Http4sEmitter.build( + EndpointParams(config.url, port = Some(config.port), https = config.secure), + client, + retryPolicy = RetryPolicy.MaxAttempts(10), + callback = Some(emitterCallback[F] _) + ) + } + } yield new Tracker(NonEmptyList.of(emitter), "tracker-telemetry", appName) + + private def emitterCallback[F[_]: Sync]( + params: EndpointParams, + req: Request, + res: TrackerResult + ): F[Unit] = + res match { + case TrackerResult.Success(_) => + Logger[F].debug(s"Telemetry heartbeat successfully sent to ${params.getGetUri}") + case TrackerResult.Failure(code) => + Logger[F].warn(s"Sending telemetry hearbeat got unexpected HTTP code $code from ${params.getUri}") + case TrackerResult.TrackerFailure(exception) => + Logger[F].warn( + s"Telemetry hearbeat failed to reach ${params.getUri} with following exception $exception after ${req.attempt} attempts" + ) + case TrackerResult.RetriesExceeded(failure) => + Logger[F].error(s"Stopped trying to send telemetry heartbeat after following failure: $failure") + } + + private def makeHeartbeatEvent( + teleCfg: Config.Telemetry, + region: Option[String], + cloud: Option[String], + appName: String, + appVersion: String + ): SelfDescribingData[Json] = + SelfDescribingData( + SchemaKey("com.snowplowanalytics.oss", "oss_context", "jsonschema", SchemaVer.Full(1, 0, 1)), + Json.obj( + "userProvidedId" -> teleCfg.userProvidedId.asJson, + "autoGeneratedId" -> teleCfg.autoGeneratedId.asJson, + "moduleName" -> teleCfg.moduleName.asJson, + "moduleVersion" -> teleCfg.moduleVersion.asJson, + "instanceId" -> teleCfg.instanceId.asJson, + "appGeneratedId" -> java.util.UUID.randomUUID.toString.asJson, + "cloud" -> cloud.asJson, + "region" -> region.asJson, + "applicationName" -> appName.asJson, + "applicationVersion" -> appVersion.asJson + ) + ) + + case class TelemetryInfo( + region: Option[String], + cloud: Option[String] + ) +} diff --git a/http4s/src/test/scala/com.snowplowanalytics.snowplow.collector.core/TestUtils.scala b/http4s/src/test/scala/com.snowplowanalytics.snowplow.collector.core/TestUtils.scala index 6ef978288..c465521ce 100644 --- a/http4s/src/test/scala/com.snowplowanalytics.snowplow.collector.core/TestUtils.scala +++ b/http4s/src/test/scala/com.snowplowanalytics.snowplow.collector.core/TestUtils.scala @@ -14,6 +14,7 @@ object TestUtils { val appInfo = new AppInfo { def name = appName + def moduleName = appName def version = appVersion def dockerAlias = "docker run collector" } @@ -102,6 +103,19 @@ object TestUtils { ), enableDefaultRedirect = false, redirectDomains = Set.empty[String], - preTerminationPeriod = 10.seconds + preTerminationPeriod = 10.seconds, + telemetry = Config.Telemetry( + disable = false, + interval = 60.minutes, + method = "POST", + url = "telemetry-g.snowplowanalytics.com", + port = 443, + secure = true, + userProvidedId = None, + moduleName = None, + moduleVersion = None, + instanceId = None, + autoGeneratedId = None + ) ) } diff --git a/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/KinesisCollector.scala b/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/KinesisCollector.scala index ca45f2dc6..e4ab86fd8 100644 --- a/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/KinesisCollector.scala +++ b/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/KinesisCollector.scala @@ -10,7 +10,7 @@ package com.snowplowanalytics.snowplow.collectors.scalastream import cats.effect.{IO, Resource} import com.snowplowanalytics.snowplow.collector.core.model.Sinks -import com.snowplowanalytics.snowplow.collector.core.{App, Config} +import com.snowplowanalytics.snowplow.collector.core.{App, Config, Telemetry} import com.snowplowanalytics.snowplow.collectors.scalastream.sinks.{KinesisSink, KinesisSinkConfig} import org.slf4j.LoggerFactory @@ -42,6 +42,12 @@ object KinesisCollector extends App[KinesisSinkConfig](BuildInfo) { } yield Sinks(good, bad) } + override def telemetryInfo(config: Config[KinesisSinkConfig]): Telemetry.TelemetryInfo = + Telemetry.TelemetryInfo( + region = Some(config.streams.sink.region), + cloud = Some("AWS") + ) + def buildExecutorService(kc: KinesisSinkConfig): ScheduledThreadPoolExecutor = { log.info("Creating thread pool of size " + kc.threadPoolSize) new ScheduledThreadPoolExecutor(kc.threadPoolSize) diff --git a/kinesis/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisConfigSpec.scala b/kinesis/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisConfigSpec.scala index d08ea3ba0..31d6c77cb 100644 --- a/kinesis/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisConfigSpec.scala +++ b/kinesis/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisConfigSpec.scala @@ -22,8 +22,23 @@ class KinesisConfigSpec extends Specification with CatsEffect { "Config parser" should { "be able to parse extended kinesis config" in { assert( - resource = "/config.kinesis.extended.hocon", - expectedResult = Right(KinesisConfigSpec.expectedConfig) + resource = "/config.kinesis.extended.hocon", + expectedResult = Right( + KinesisConfigSpec + .expectedConfig + .copy( + telemetry = KinesisConfigSpec + .expectedConfig + .telemetry + .copy( + userProvidedId = Some("my_pipeline"), + moduleName = Some("collector-kinesis-ec2"), + moduleVersion = Some("0.5.2"), + instanceId = Some("665bhft5u6udjf"), + autoGeneratedId = Some("hfy67e5ydhtrd") + ) + ) + ) ) } "be able to parse minimal kinesis config" in { @@ -123,6 +138,19 @@ object KinesisConfigSpec { customEndpoint = None, startupCheckInterval = 1.second ) + ), + telemetry = Config.Telemetry( + disable = false, + interval = 60.minutes, + method = "POST", + url = "telemetry-g.snowplowanalytics.com", + port = 443, + secure = true, + userProvidedId = None, + moduleName = None, + moduleVersion = None, + instanceId = None, + autoGeneratedId = None ) ) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index b3f0771a2..187e393e2 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -34,7 +34,7 @@ object Dependencies { val protobuf = "3.21.7" // force this version to mitigate security vulnerabilities // Scala val collectorPayload = "0.0.0" - val tracker = "1.0.1" + val tracker = "2.0.0" val akkaHttp = "10.2.7" val akka = "2.6.16" val scopt = "4.0.1" @@ -56,10 +56,11 @@ object Dependencies { val specs2CE = "1.5.0" val testcontainers = "0.40.10" - object LegacyIT { + object Legacy { val specs2CE = "0.4.1" val catsRetry = "2.1.0" val http4s = "0.21.33" + val tracker = "1.0.1" } } @@ -84,23 +85,24 @@ object Dependencies { val protobuf = "com.google.protobuf" % "protobuf-java" % V.protobuf // Scala - val collectorPayload = "com.snowplowanalytics" % "collector-payload-1" % V.collectorPayload - val badRows = "com.snowplowanalytics" %% "snowplow-badrows" % V.badRows - val trackerCore = "com.snowplowanalytics" %% "snowplow-scala-tracker-core" % V.tracker - val trackerEmitterId = "com.snowplowanalytics" %% "snowplow-scala-tracker-emitter-id" % V.tracker - val scopt = "com.github.scopt" %% "scopt" % V.scopt - val akkaHttp = "com.typesafe.akka" %% "akka-http" % V.akkaHttp - val akkaStream = "com.typesafe.akka" %% "akka-stream" % V.akka - val akkaSlf4j = "com.typesafe.akka" %% "akka-slf4j" % V.akka - val pureconfig = "com.github.pureconfig" %% "pureconfig" % V.pureconfig - val akkaHttpMetrics = "fr.davit" %% "akka-http-metrics-datadog" % V.akkaHttpMetrics - val log4cats = "org.typelevel" %% "log4cats-slf4j" % V.log4cats + val collectorPayload = "com.snowplowanalytics" % "collector-payload-1" % V.collectorPayload + val badRows = "com.snowplowanalytics" %% "snowplow-badrows" % V.badRows + val trackerCore = "com.snowplowanalytics" %% "snowplow-scala-tracker-core" % V.tracker + val emitterHttps = "com.snowplowanalytics" %% "snowplow-scala-tracker-emitter-http4s" % V.tracker + val scopt = "com.github.scopt" %% "scopt" % V.scopt + val akkaHttp = "com.typesafe.akka" %% "akka-http" % V.akkaHttp + val akkaStream = "com.typesafe.akka" %% "akka-stream" % V.akka + val akkaSlf4j = "com.typesafe.akka" %% "akka-slf4j" % V.akka + val pureconfig = "com.github.pureconfig" %% "pureconfig" % V.pureconfig + val akkaHttpMetrics = "fr.davit" %% "akka-http-metrics-datadog" % V.akkaHttpMetrics + val log4cats = "org.typelevel" %% "log4cats-slf4j" % V.log4cats // http4s val http4sDsl = "org.http4s" %% "http4s-dsl" % V.http4s val http4sEmber = "org.http4s" %% "http4s-ember-server" % V.http4s val http4sBlaze = "org.http4s" %% "http4s-blaze-server" % V.blaze val http4sNetty = "org.http4s" %% "http4s-netty-server" % V.http4sNetty + val http4sClient = "org.http4s" %% "http4s-blaze-client" % V.blaze val decline = "com.monovore" %% "decline-effect" % V.decline val circeGeneric = "io.circe" %% "circe-generic" % V.circe val circeConfig = "io.circe" %% "circe-config" % V.circeConfig @@ -111,7 +113,7 @@ object Dependencies { // Test common val specs2 = "org.specs2" %% "specs2-core" % V.specs2 % Test - val specs2CE = "org.typelevel" %% "cats-effect-testing-specs2" % V.specs2CE % Test + val specs2CE = "org.typelevel" %% "cats-effect-testing-specs2" % V.specs2CE % Test // Test Akka val akkaTestkit = "com.typesafe.akka" %% "akka-testkit" % V.akka % Test @@ -127,13 +129,14 @@ object Dependencies { val http4sClient = "org.http4s" %% "http4s-blaze-client" % V.blaze % IntegrationTest } - // Integration test legacy - object LegacyIT { + object Legacy { val testcontainers = "com.dimafeng" %% "testcontainers-scala-core" % V.testcontainers % IntegrationTest val specs2 = "org.specs2" %% "specs2-core" % V.specs2 % IntegrationTest - val specs2CE = "com.codecommit" %% "cats-effect-testing-specs2" % V.LegacyIT.specs2CE % IntegrationTest - val catsRetry = "com.github.cb372" %% "cats-retry" % V.LegacyIT.catsRetry % IntegrationTest - val http4sClient = "org.http4s" %% "http4s-blaze-client" % V.LegacyIT.http4s % IntegrationTest + val specs2CE = "com.codecommit" %% "cats-effect-testing-specs2" % V.Legacy.specs2CE % IntegrationTest + val catsRetry = "com.github.cb372" %% "cats-retry" % V.Legacy.catsRetry % IntegrationTest + val http4sClient = "org.http4s" %% "http4s-blaze-client" % V.Legacy.http4s % IntegrationTest + val trackerCore = "com.snowplowanalytics" %% "snowplow-scala-tracker-core" % V.Legacy.tracker + val trackerEmitterId = "com.snowplowanalytics" %% "snowplow-scala-tracker-emitter-id" % V.Legacy.tracker } } } diff --git a/pubsub/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/PubSubCollector.scala b/pubsub/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/PubSubCollector.scala index 6a1648ca6..cc71cf6ee 100644 --- a/pubsub/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/PubSubCollector.scala +++ b/pubsub/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/PubSubCollector.scala @@ -3,7 +3,7 @@ package com.snowplowanalytics.snowplow.collectors.scalastream import cats.effect._ import cats.effect.kernel.Resource import com.snowplowanalytics.snowplow.collector.core.model.Sinks -import com.snowplowanalytics.snowplow.collector.core.{App, Config} +import com.snowplowanalytics.snowplow.collector.core.{App, Config, Telemetry} import com.snowplowanalytics.snowplow.collectors.scalastream.sinks.{PubSubSink, PubSubSinkConfig} object PubSubCollector extends App[PubSubSinkConfig](BuildInfo) { @@ -13,4 +13,10 @@ object PubSubCollector extends App[PubSubSinkConfig](BuildInfo) { good <- PubSubSink.create[IO](config.sink.maxBytes, config.sink, config.buffer, config.good) bad <- PubSubSink.create[IO](config.sink.maxBytes, config.sink, config.buffer, config.bad) } yield Sinks(good, bad) + + override def telemetryInfo(config: Config[PubSubSinkConfig]): Telemetry.TelemetryInfo = + Telemetry.TelemetryInfo( + region = None, + cloud = Some("GCP") + ) } diff --git a/pubsub/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/ConfigSpec.scala b/pubsub/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/ConfigSpec.scala index e2bbba7e9..fbc2f4ae9 100644 --- a/pubsub/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/ConfigSpec.scala +++ b/pubsub/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/ConfigSpec.scala @@ -119,6 +119,19 @@ object ConfigSpec { startupCheckInterval = 1.second, retryInterval = 10.seconds ) + ), + telemetry = Config.Telemetry( + disable = false, + interval = 60.minutes, + method = "POST", + url = "telemetry-g.snowplowanalytics.com", + port = 443, + secure = true, + userProvidedId = None, + moduleName = None, + moduleVersion = None, + instanceId = None, + autoGeneratedId = None ) ) diff --git a/sqs/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/SqsCollector.scala b/sqs/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/SqsCollector.scala index 2e973a0c0..f61cbf4d6 100644 --- a/sqs/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/SqsCollector.scala +++ b/sqs/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/SqsCollector.scala @@ -9,11 +9,9 @@ package com.snowplowanalytics.snowplow.collectors.scalastream import java.util.concurrent.ScheduledThreadPoolExecutor - import cats.effect.{IO, Resource} - import com.snowplowanalytics.snowplow.collector.core.model.Sinks -import com.snowplowanalytics.snowplow.collector.core.{App, Config} +import com.snowplowanalytics.snowplow.collector.core.{App, Config, Telemetry} import com.snowplowanalytics.snowplow.collectors.scalastream.sinks._ object SqsCollector extends App[SqsSinkConfig](BuildInfo) { @@ -37,4 +35,10 @@ object SqsCollector extends App[SqsSinkConfig](BuildInfo) { ) } yield Sinks(good, bad) } + + override def telemetryInfo(config: Config[SqsSinkConfig]): Telemetry.TelemetryInfo = + Telemetry.TelemetryInfo( + region = Some(config.streams.sink.region), + cloud = Some("AWS") + ) } diff --git a/sqs/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/SqsConfigSpec.scala b/sqs/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/SqsConfigSpec.scala index b178185fa..a9054d8c8 100644 --- a/sqs/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/SqsConfigSpec.scala +++ b/sqs/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/SqsConfigSpec.scala @@ -125,6 +125,19 @@ object SqsConfigSpec { ), threadPoolSize = 10 ) + ), + telemetry = Config.Telemetry( + disable = false, + interval = 60.minutes, + method = "POST", + url = "telemetry-g.snowplowanalytics.com", + port = 443, + secure = true, + userProvidedId = None, + moduleName = None, + moduleVersion = None, + instanceId = None, + autoGeneratedId = None ) ) diff --git a/stdout/src/main/scala/com.snowplowanalytics.snowplow.collector.stdout/StdoutCollector.scala b/stdout/src/main/scala/com.snowplowanalytics.snowplow.collector.stdout/StdoutCollector.scala index 5f4dd8659..ac8070eb4 100644 --- a/stdout/src/main/scala/com.snowplowanalytics.snowplow.collector.stdout/StdoutCollector.scala +++ b/stdout/src/main/scala/com.snowplowanalytics.snowplow.collector.stdout/StdoutCollector.scala @@ -3,8 +3,7 @@ package com.snowplowanalytics.snowplow.collector.stdout import cats.effect.IO import cats.effect.kernel.Resource import com.snowplowanalytics.snowplow.collector.core.model.Sinks -import com.snowplowanalytics.snowplow.collector.core.App -import com.snowplowanalytics.snowplow.collector.core.Config +import com.snowplowanalytics.snowplow.collector.core.{App, Config, Telemetry} object StdoutCollector extends App[SinkConfig](BuildInfo) { @@ -13,4 +12,7 @@ object StdoutCollector extends App[SinkConfig](BuildInfo) { val bad = new PrintingSink[IO](config.sink.maxBytes, System.err) Resource.pure(Sinks(good, bad)) } + + override def telemetryInfo(config: Config[SinkConfig]): Telemetry.TelemetryInfo = + Telemetry.TelemetryInfo(None, None) }