diff --git a/stdout/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/PrintingSink.scala b/stdout/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/PrintingSink.scala new file mode 100644 index 000000000..e6117809d --- /dev/null +++ b/stdout/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/PrintingSink.scala @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2013-2023 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, and + * you may not use this file except in compliance with the Apache License + * Version 2.0. You may obtain a copy of the Apache License Version 2.0 at + * http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the Apache License Version 2.0 is distributed on an "AS + * IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the Apache License Version 2.0 for the specific language + * governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.collectors.scalastream + +import cats.effect.Sync +import cats.implicits._ + +import java.io.PrintStream +import java.util.Base64 + +class PrintingSink[F[_]: Sync](stream: PrintStream) extends Sink[F] { + private val encoder: Base64.Encoder = Base64.getEncoder.withoutPadding() + + override val maxBytes: Int = Int.MaxValue // TODO: configurable? + override def isHealthy: F[Boolean] = Sync[F].pure(true) + + override def storeRawEvents(events: List[Array[Byte]], key: String): F[Unit] = + events.traverse_ { event => + Sync[F].delay { + stream.println(encoder.encodeToString(event)) + } + } +} diff --git a/stdout/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/StdoutCollector.scala b/stdout/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/StdoutCollector.scala index fd89386ec..656611f9d 100644 --- a/stdout/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/StdoutCollector.scala +++ b/stdout/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/StdoutCollector.scala @@ -8,21 +8,16 @@ */ package com.snowplowanalytics.snowplow.collectors.scalastream -import cats.effect.{ExitCode, IO, IOApp, Sync} import cats.effect.kernel.Resource -import cats.implicits._ - -import java.util.Base64 -import java.io.PrintStream - -import com.snowplowanalytics.snowplow.collectors.scalastream.model._ +import cats.effect.{ExitCode, IO, IOApp} import com.snowplowanalytics.snowplow.collectors.scalastream.generated.BuildInfo +import com.snowplowanalytics.snowplow.collectors.scalastream.model._ object StdoutCollector extends IOApp { def run(args: List[String]): IO[ExitCode] = { - val good = Resource.pure[IO, Sink[IO]](printingSink(System.out)) - val bad = Resource.pure[IO, Sink[IO]](printingSink(System.err)) + val good = Resource.pure[IO, Sink[IO]](new PrintingSink[IO](System.out)) + val bad = Resource.pure[IO, Sink[IO]](new PrintingSink[IO](System.err)) CollectorApp.run[IO]( good, bad, @@ -31,18 +26,4 @@ object StdoutCollector extends IOApp { BuildInfo.version ) } - - private def printingSink[F[_]: Sync](stream: PrintStream): Sink[F] = new Sink[F] { - val maxBytes = Int.MaxValue // TODO: configurable? - def isHealthy: F[Boolean] = Sync[F].pure(true) - - val encoder = Base64.getEncoder().withoutPadding() - - def storeRawEvents(events: List[Array[Byte]], key: String): F[Unit] = - events.traverse_ { e => - Sync[F].delay { - stream.println(encoder.encodeToString(e)) - } - } - } } diff --git a/stdout/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/PrintingSinkSpec.scala b/stdout/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/PrintingSinkSpec.scala new file mode 100644 index 000000000..359006c92 --- /dev/null +++ b/stdout/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/PrintingSinkSpec.scala @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2013-2022 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, and + * you may not use this file except in compliance with the Apache License + * Version 2.0. You may obtain a copy of the Apache License Version 2.0 at + * http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the Apache License Version 2.0 is distributed on an "AS + * IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the Apache License Version 2.0 for the specific language + * governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.collectors.scalastream.sinks + +import cats.effect.IO +import cats.effect.unsafe.implicits.global +import com.snowplowanalytics.snowplow.collectors.scalastream.PrintingSink +import org.specs2.mutable.Specification + +import java.io.{ByteArrayOutputStream, PrintStream} +import java.nio.charset.StandardCharsets + +class PrintingSinkSpec extends Specification { + + "Printing sink" should { + "print provided bytes encoded as BASE64 string" in { + val baos = new ByteArrayOutputStream() + val sink = new PrintingSink[IO](new PrintStream(baos)) + val input = "Something" + + sink.storeRawEvents(List(input.getBytes(StandardCharsets.UTF_8)), "key").unsafeRunSync() + + baos.toString(StandardCharsets.UTF_8) must beEqualTo("U29tZXRoaW5n\n") // base64 of 'Something' + newline + } + } +}