Skip to content

Commit

Permalink
Add http4s graceful shutdown (close #365)
Browse files Browse the repository at this point in the history
  • Loading branch information
istreeter authored and peel committed Nov 10, 2023
1 parent 23b1121 commit c31a352
Show file tree
Hide file tree
Showing 6 changed files with 100 additions and 30 deletions.
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package com.snowplowanalytics.snowplow.collectors.scalastream

import cats.implicits._
import cats.effect.{ExitCode, IO}
import cats.effect.{Async, ExitCode, Sync}
import cats.effect.kernel.Resource
import fs2.io.net.Network
import com.comcast.ip4s.IpLiteralSyntax
import org.http4s.HttpApp
import org.http4s.server.Server
import org.http4s.ember.server.EmberServerBuilder
import org.http4s.blaze.server.BlazeServerBuilder
Expand All @@ -12,47 +14,66 @@ import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.slf4j.Slf4jLogger

import java.net.InetSocketAddress
import scala.concurrent.duration.DurationLong
import scala.concurrent.duration.{DurationLong, FiniteDuration}

object CollectorApp {

implicit private def unsafeLogger: Logger[IO] =
Slf4jLogger.getLogger[IO]
implicit private def unsafeLogger[F[_]: Sync]: Logger[F] =
Slf4jLogger.getLogger[F]

def run(): IO[ExitCode] =
buildHttpServer().use(_ => IO.never).as(ExitCode.Success)
def run[F[_]: Async](mkGood: Resource[F, Sink[F]], mkBad: Resource[F, Sink[F]]): F[ExitCode] = {
val resources = for {
bad <- mkBad
good <- mkGood
_ <- withGracefulShutdown(610.seconds) {
buildHttpServer[F](new CollectorRoutes[F](good, bad).value)
}
} yield ()

private def buildHttpServer(): Resource[IO, Server] =
resources.surround(Async[F].never[ExitCode])
}

private def withGracefulShutdown[F[_]: Async, A](delay: FiniteDuration)(resource: Resource[F, A]): Resource[F, A] =
for {
a <- resource
_ <- Resource.onFinalizeCase {
case Resource.ExitCase.Canceled =>
Logger[F].warn(s"Shutdown interrupted. Will continue to serve requests for $delay") >>
Async[F].sleep(delay)
case _ =>
Async[F].unit
}
} yield a

private def buildHttpServer[F[_]: Async](app: HttpApp[F]): Resource[F, Server] =
sys.env.get("HTTP4S_BACKEND").map(_.toUpperCase()) match {
case Some("EMBER") | None => buildEmberServer
case Some("BLAZE") => buildBlazeServer
case Some("NETTY") => buildNettyServer
case Some("EMBER") | None => buildEmberServer[F](app)
case Some("BLAZE") => buildBlazeServer[F](app)
case Some("NETTY") => buildNettyServer[F](app)
case Some(other) => throw new IllegalArgumentException(s"Unrecognized http4s backend $other")
}

private def buildEmberServer =
Resource.eval(Logger[IO].info("Building ember server")) >>
private def buildEmberServer[F[_]: Async](app: HttpApp[F]) = {
implicit val network = Network.forAsync[F]
Resource.eval(Logger[F].info("Building ember server")) >>
EmberServerBuilder
.default[IO]
.default[F]
.withHost(ipv4"0.0.0.0")
.withPort(port"8080")
.withHttpApp(new CollectorRoutes[IO].value)
.withHttpApp(app)
.withIdleTimeout(610.seconds)
.build
}

private def buildBlazeServer: Resource[IO, Server] =
Resource.eval(Logger[IO].info("Building blaze server")) >>
BlazeServerBuilder[IO]
private def buildBlazeServer[F[_]: Async](app: HttpApp[F]): Resource[F, Server] =
Resource.eval(Logger[F].info("Building blaze server")) >>
BlazeServerBuilder[F]
.bindSocketAddress(new InetSocketAddress(8080))
.withHttpApp(new CollectorRoutes[IO].value)
.withHttpApp(app)
.withIdleTimeout(610.seconds)
.resource

private def buildNettyServer: Resource[IO, Server] =
Resource.eval(Logger[IO].info("Building netty server")) >>
NettyServerBuilder[IO]
.bindLocal(8080)
.withHttpApp(new CollectorRoutes[IO].value)
.withIdleTimeout(610.seconds)
.resource
private def buildNettyServer[F[_]: Async](app: HttpApp[F]): Resource[F, Server] =
Resource.eval(Logger[F].info("Building netty server")) >>
NettyServerBuilder[F].bindLocal(8080).withHttpApp(app).withIdleTimeout(610.seconds).resource
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import cats.effect.Sync
import org.http4s.{HttpApp, HttpRoutes}
import org.http4s.dsl.Http4sDsl

class CollectorRoutes[F[_]: Sync]() extends Http4sDsl[F] {
class CollectorRoutes[F[_]: Sync](good: Sink[F], bad: Sink[F]) extends Http4sDsl[F] {

val _ = (good, bad)

lazy val value: HttpApp[F] = HttpRoutes
.of[F] {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package com.snowplowanalytics.snowplow.collectors.scalastream

trait Sink[F[_]] {

// Maximum number of bytes that a single record can contain.
// If a record is bigger, a size violation bad row is emitted instead
val maxBytes: Int

def isHealthy: F[Boolean]
def storeRawEvents(events: List[Array[Byte]], key: String): F[Unit]
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ class CollectorRoutesSpec extends Specification {
"Health endpoint" should {
"return OK always because collector always works" in {
val request = Request[IO](method = Method.GET, uri = uri"/health")
val response = new CollectorRoutes[IO].value.run(request).unsafeRunSync()
val routes = new CollectorRoutes[IO](CollectorTestUtils.noopSink, CollectorTestUtils.noopSink)
val response = routes.value.run(request).unsafeRunSync()

response.status must beEqualTo(Status.Ok)
response.as[String].unsafeRunSync() must beEqualTo("OK")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.snowplowanalytics.snowplow.collectors.scalastream

import cats.Applicative

object CollectorTestUtils {

def noopSink[F[_]: Applicative]: Sink[F] = new Sink[F] {
val maxBytes: Int = Int.MaxValue
def isHealthy: F[Boolean] = Applicative[F].pure(true)
def storeRawEvents(events: List[Array[Byte]], key: String): F[Unit] = Applicative[F].unit
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,32 @@
*/
package com.snowplowanalytics.snowplow.collectors.scalastream

import cats.effect.{ExitCode, IO, IOApp}
import cats.effect.{ExitCode, IO, IOApp, Sync}
import cats.effect.kernel.Resource
import cats.implicits._

import java.util.Base64
import java.io.PrintStream

object StdoutCollector extends IOApp {

def run(args: List[String]): IO[ExitCode] =
CollectorApp.run()
def run(args: List[String]): IO[ExitCode] = {
val good = Resource.pure[IO, Sink[IO]](printingSink(System.out))
val bad = Resource.pure[IO, Sink[IO]](printingSink(System.err))
CollectorApp.run[IO](good, bad)
}

private def printingSink[F[_]: Sync](stream: PrintStream): Sink[F] = new Sink[F] {
val maxBytes = Int.MaxValue // TODO: configurable?
def isHealthy: F[Boolean] = Sync[F].pure(true)

val encoder = Base64.getEncoder().withoutPadding()

def storeRawEvents(events: List[Array[Byte]], key: String): F[Unit] =
events.traverse_ { e =>
Sync[F].delay {
stream.println(encoder.encodeToString(e))
}
}
}
}

0 comments on commit c31a352

Please sign in to comment.