Skip to content

Commit

Permalink
Add telemetry support (close #381)
Browse files Browse the repository at this point in the history
  • Loading branch information
spenes committed Nov 9, 2023
1 parent 10317be commit f11ca01
Show file tree
Hide file tree
Showing 17 changed files with 318 additions and 48 deletions.
15 changes: 9 additions & 6 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,17 @@ lazy val commonDependencies = Seq(
Dependencies.Libraries.badRows,
Dependencies.Libraries.collectorPayload,
Dependencies.Libraries.pureconfig,
Dependencies.Libraries.trackerCore,
Dependencies.Libraries.trackerEmitterId,
Dependencies.Libraries.Legacy.trackerCore,
Dependencies.Libraries.Legacy.trackerEmitterId,
// Unit tests
Dependencies.Libraries.akkaTestkit,
Dependencies.Libraries.akkaHttpTestkit,
Dependencies.Libraries.akkaStreamTestkit,
Dependencies.Libraries.specs2,
// Integration tests
Dependencies.Libraries.LegacyIT.testcontainers,
Dependencies.Libraries.LegacyIT.http4sClient,
Dependencies.Libraries.LegacyIT.catsRetry
Dependencies.Libraries.Legacy.testcontainers,
Dependencies.Libraries.Legacy.http4sClient,
Dependencies.Libraries.Legacy.catsRetry
)

lazy val commonExclusions = Seq(
Expand Down Expand Up @@ -101,7 +101,7 @@ lazy val dynVerSettings = Seq(
)

lazy val http4sBuildInfoSettings = Seq(
buildInfoKeys := Seq[BuildInfoKey](name, dockerAlias, version),
buildInfoKeys := Seq[BuildInfoKey](name, moduleName, dockerAlias, version),
buildInfoOptions += BuildInfoOption.Traits("com.snowplowanalytics.snowplow.collector.core.AppInfo")
)

Expand Down Expand Up @@ -134,6 +134,7 @@ lazy val http4s = project
Dependencies.Libraries.http4sEmber,
Dependencies.Libraries.http4sBlaze,
Dependencies.Libraries.http4sNetty,
Dependencies.Libraries.http4sClient,
Dependencies.Libraries.log4cats,
Dependencies.Libraries.thrift,
Dependencies.Libraries.badRows,
Expand All @@ -142,6 +143,8 @@ lazy val http4s = project
Dependencies.Libraries.decline,
Dependencies.Libraries.circeGeneric,
Dependencies.Libraries.circeConfig,
Dependencies.Libraries.trackerCore,
Dependencies.Libraries.emitterHttps,
Dependencies.Libraries.specs2,
Dependencies.Libraries.specs2CE,

Expand Down
7 changes: 7 additions & 0 deletions examples/config.kinesis.extended.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,13 @@ collector {
url = telemetry-g.snowplowanalytics.com
port = 443
secure = true

# Identifiers used by collector terraform module
userProvidedId = my_pipeline,
moduleName = collector-kinesis-ec2,
moduleVersion = 0.5.2,
instanceId = 665bhft5u6udjf,
autoGeneratedId = hfy67e5ydhtrd
}

monitoring.metrics.statsd {
Expand Down
9 changes: 9 additions & 0 deletions http4s/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,15 @@
}
}

telemetry {
disable = false
interval = 60 minutes
method = POST
url = telemetry-g.snowplowanalytics.com
port = 443
secure = true
}

monitoring {
metrics {
statsd {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import com.monovore.decline.Opts

import io.circe.Decoder

import com.snowplowanalytics.snowplow.scalatracker.emitters.http4s.ceTracking

import com.snowplowanalytics.snowplow.collector.core.model.Sinks

abstract class App[SinkConfig <: Config.Sink: Decoder](appInfo: AppInfo)
Expand All @@ -19,7 +21,9 @@ abstract class App[SinkConfig <: Config.Sink: Decoder](appInfo: AppInfo)

def mkSinks(config: Config.Streams[SinkConfig]): Resource[IO, Sinks[IO]]

final def main: Opts[IO[ExitCode]] = Run.fromCli[IO, SinkConfig](appInfo, mkSinks)
def telemetryInfo(config: Config[SinkConfig]): Telemetry.TelemetryInfo

final def main: Opts[IO[ExitCode]] = Run.fromCli[IO, SinkConfig](appInfo, mkSinks, telemetryInfo)
}

object App {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package com.snowplowanalytics.snowplow.collector.core

trait AppInfo {
def name: String
def moduleName: String
def version: String
def dockerAlias: String
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.snowplowanalytics.snowplow.collector.core

import scala.concurrent.duration.FiniteDuration
import scala.concurrent.duration._

import io.circe.config.syntax._
import io.circe.generic.semiauto._
Expand All @@ -23,6 +23,7 @@ case class Config[+SinkConfig](
cors: Config.CORS,
streams: Config.Streams[SinkConfig],
monitoring: Config.Monitoring,
telemetry: Config.Telemetry,
ssl: Config.SSL,
enableDefaultRedirect: Boolean,
redirectDomains: Set[String],
Expand Down Expand Up @@ -122,6 +123,23 @@ object Config {
port: Int
)

final case class Telemetry(
// General params
disable: Boolean,
interval: FiniteDuration,
// http params
method: String,
url: String,
port: Int,
secure: Boolean,
// Params injected by deployment scripts
userProvidedId: Option[String],
moduleName: Option[String],
moduleVersion: Option[String],
instanceId: Option[String],
autoGeneratedId: Option[String]
)

implicit def decoder[SinkConfig: Decoder]: Decoder[Config[SinkConfig]] = {
implicit val p3p = deriveDecoder[P3P]
implicit val crossDomain = deriveDecoder[CrossDomain]
Expand All @@ -147,6 +165,7 @@ object Config {
implicit val metrics = deriveDecoder[Metrics]
implicit val monitoring = deriveDecoder[Monitoring]
implicit val ssl = deriveDecoder[SSL]
implicit val telemetry = deriveDecoder[Telemetry]
deriveDecoder[Config[SinkConfig]]
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,32 +13,38 @@ import cats.data.EitherT
import cats.effect.{Async, ExitCode, Sync}
import cats.effect.kernel.Resource

import org.http4s.blaze.client.BlazeClientBuilder

import com.monovore.decline.Opts

import io.circe.Decoder

import com.snowplowanalytics.snowplow.scalatracker.Tracking

import com.snowplowanalytics.snowplow.collector.core.model.Sinks

object Run {

implicit private def logger[F[_]: Sync] = Slf4jLogger.getLogger[F]

def fromCli[F[_]: Async, SinkConfig: Decoder](
def fromCli[F[_]: Async: Tracking, SinkConfig: Decoder](
appInfo: AppInfo,
mkSinks: Config.Streams[SinkConfig] => Resource[F, Sinks[F]]
mkSinks: Config.Streams[SinkConfig] => Resource[F, Sinks[F]],
telemetryInfo: Config[SinkConfig] => Telemetry.TelemetryInfo
): Opts[F[ExitCode]] = {
val configPath = Opts.option[Path]("config", "Path to HOCON configuration (optional)", "c", "config.hocon").orNone
configPath.map(fromPath[F, SinkConfig](appInfo, mkSinks, _))
configPath.map(fromPath[F, SinkConfig](appInfo, mkSinks, telemetryInfo, _))
}

private def fromPath[F[_]: Async, SinkConfig: Decoder](
private def fromPath[F[_]: Async: Tracking, SinkConfig: Decoder](
appInfo: AppInfo,
mkSinks: Config.Streams[SinkConfig] => Resource[F, Sinks[F]],
telemetryInfo: Config[SinkConfig] => Telemetry.TelemetryInfo,
path: Option[Path]
): F[ExitCode] = {
val eitherT = for {
config <- ConfigParser.fromPath[F, SinkConfig](path)
_ <- EitherT.right[ExitCode](fromConfig(appInfo, mkSinks, config))
_ <- EitherT.right[ExitCode](fromConfig(appInfo, mkSinks, config, telemetryInfo))
} yield ExitCode.Success

eitherT.merge.handleErrorWith { e =>
Expand All @@ -47,10 +53,11 @@ object Run {
}
}

private def fromConfig[F[_]: Async, SinkConfig](
private def fromConfig[F[_]: Async: Tracking, SinkConfig](
appInfo: AppInfo,
mkSinks: Config.Streams[SinkConfig] => Resource[F, Sinks[F]],
config: Config[SinkConfig]
config: Config[SinkConfig],
telemetryInfo: Config[SinkConfig] => Telemetry.TelemetryInfo
): F[ExitCode] = {
val resources = for {
sinks <- mkSinks(config.streams)
Expand All @@ -65,10 +72,23 @@ object Run {
if (config.ssl.enable) config.ssl.port else config.port,
config.ssl.enable
)
_ <- withGracefulShutdown(config.preTerminationPeriod)(httpServer)
} yield ()

resources.surround(Async[F].never[ExitCode])
_ <- withGracefulShutdown(config.preTerminationPeriod)(httpServer)
httpClient <- BlazeClientBuilder[F].resource
} yield httpClient

resources.use { httpClient =>
Telemetry
.run(
config.telemetry,
httpClient,
appInfo,
telemetryInfo(config).region,
telemetryInfo(config).cloud
)
.compile
.drain
.flatMap(_ => Async[F].never[ExitCode])
}
}

private def prettyLogException[F[_]: Sync](e: Throwable): F[Unit] = {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package com.snowplowanalytics.snowplow.collector.core

import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.slf4j.Slf4jLogger

import cats.data.NonEmptyList
import cats.implicits._

import cats.effect.{Async, Resource, Sync}
import cats.effect.std.Random

import fs2.Stream

import org.http4s.client.{Client => HttpClient}

import _root_.io.circe.Json
import _root_.io.circe.syntax._

import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer, SelfDescribingData}

import com.snowplowanalytics.snowplow.scalatracker.{Tracker, Tracking}
import com.snowplowanalytics.snowplow.scalatracker.Emitter._
import com.snowplowanalytics.snowplow.scalatracker.Emitter.{Result => TrackerResult}
import com.snowplowanalytics.snowplow.scalatracker.emitters.http4s.Http4sEmitter

object Telemetry {

implicit private def unsafeLogger[F[_]: Sync]: Logger[F] =
Slf4jLogger.getLogger[F]

def run[F[_]: Async: Tracking](
telemetryConfig: Config.Telemetry,
httpClient: HttpClient[F],
appInfo: AppInfo,
region: Option[String],
cloud: Option[String]
): Stream[F, Unit] =
if (telemetryConfig.disable)
Stream.empty.covary[F]
else {
val sdj = makeHeartbeatEvent(
telemetryConfig,
region,
cloud,
appInfo.moduleName,
appInfo.version
)
Stream.resource(initTracker(telemetryConfig, appInfo.moduleName, httpClient)).flatMap { tracker =>
Stream.fixedDelay[F](telemetryConfig.interval).evalMap { _ =>
tracker.trackSelfDescribingEvent(unstructEvent = sdj) >> tracker.flushEmitters()
}
}
}

private def initTracker[F[_]: Async: Tracking](
config: Config.Telemetry,
appName: String,
client: HttpClient[F]
): Resource[F, Tracker[F]] =
for {
random <- Resource.eval(Random.scalaUtilRandom[F])
emitter <- {
implicit val r: Random[F] = random
Http4sEmitter.build(
EndpointParams(config.url, port = Some(config.port), https = config.secure),
client,
retryPolicy = RetryPolicy.MaxAttempts(10),
callback = Some(emitterCallback[F] _)
)
}
} yield new Tracker(NonEmptyList.of(emitter), "tracker-telemetry", appName)

private def emitterCallback[F[_]: Sync](
params: EndpointParams,
req: Request,
res: TrackerResult
): F[Unit] =
res match {
case TrackerResult.Success(_) =>
Logger[F].debug(s"Telemetry heartbeat successfully sent to ${params.getGetUri}")
case TrackerResult.Failure(code) =>
Logger[F].warn(s"Sending telemetry hearbeat got unexpected HTTP code $code from ${params.getUri}")
case TrackerResult.TrackerFailure(exception) =>
Logger[F].warn(
s"Telemetry hearbeat failed to reach ${params.getUri} with following exception $exception after ${req.attempt} attempts"
)
case TrackerResult.RetriesExceeded(failure) =>
Logger[F].error(s"Stopped trying to send telemetry heartbeat after following failure: $failure")
}

private def makeHeartbeatEvent(
teleCfg: Config.Telemetry,
region: Option[String],
cloud: Option[String],
appName: String,
appVersion: String
): SelfDescribingData[Json] =
SelfDescribingData(
SchemaKey("com.snowplowanalytics.oss", "oss_context", "jsonschema", SchemaVer.Full(1, 0, 1)),
Json.obj(
"userProvidedId" -> teleCfg.userProvidedId.asJson,
"autoGeneratedId" -> teleCfg.autoGeneratedId.asJson,
"moduleName" -> teleCfg.moduleName.asJson,
"moduleVersion" -> teleCfg.moduleVersion.asJson,
"instanceId" -> teleCfg.instanceId.asJson,
"appGeneratedId" -> java.util.UUID.randomUUID.toString.asJson,
"cloud" -> cloud.asJson,
"region" -> region.asJson,
"applicationName" -> appName.asJson,
"applicationVersion" -> appVersion.asJson
)
)

case class TelemetryInfo(
region: Option[String],
cloud: Option[String]
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ object TestUtils {

val appInfo = new AppInfo {
def name = appName
def moduleName = appName
def version = appVersion
def dockerAlias = "docker run collector"
}
Expand Down Expand Up @@ -102,6 +103,19 @@ object TestUtils {
),
enableDefaultRedirect = false,
redirectDomains = Set.empty[String],
preTerminationPeriod = 10.seconds
preTerminationPeriod = 10.seconds,
telemetry = Config.Telemetry(
disable = false,
interval = 60.minutes,
method = "POST",
url = "telemetry-g.snowplowanalytics.com",
port = 443,
secure = true,
userProvidedId = None,
moduleName = None,
moduleVersion = None,
instanceId = None,
autoGeneratedId = None
)
)
}
Loading

0 comments on commit f11ca01

Please sign in to comment.