Skip to content

Commit

Permalink
Add http4s GET and HEAD endpoints (close #369)
Browse files Browse the repository at this point in the history
  • Loading branch information
spenes authored and peel committed Nov 10, 2023
1 parent 14cc2ef commit 958ffb6
Show file tree
Hide file tree
Showing 4 changed files with 241 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,48 @@ class CollectorRoutes[F[_]: Sync](collectorService: Service[F]) extends Http4sDs
}

private val cookieRoutes = HttpRoutes.of[F] {
case req @ POST -> Root / vendor / version =>
case req @ (POST | GET | HEAD) -> Root / vendor / version =>
val path = collectorService.determinePath(vendor, version)
val userAgent = extractHeader(req, "User-Agent")
val referer = extractHeader(req, "Referer")
val spAnonymous = extractHeader(req, "SP-Anonymous")
val hostname = req.remoteHost.map(_.map(_.toString))
val ip = req.remoteAddr.map(_.toUriString)

collectorService.cookie(
queryString = Some(req.queryString),
body = req.bodyText.compile.string.map(Some(_)),
path = path,
cookie = None, //TODO: cookie will be added later
userAgent = userAgent,
refererUri = referer,
hostname = req.remoteHost.map(_.map(_.toString)),
ip = req.remoteAddr.map(_.toUriString), // TODO: Do not set the ip if request contains SP-Anonymous header
request = req,
pixelExpected = false,
doNotTrack = false,
contentType = req.contentType.map(_.value.toLowerCase),
spAnonymous = spAnonymous
)
req.method match {
case POST =>
collectorService.cookie(
queryString = Some(req.queryString),
body = req.bodyText.compile.string.map(Some(_)),
path = path,
cookie = None, //TODO: cookie will be added later
userAgent = userAgent,
refererUri = referer,
hostname = hostname,
ip = ip, // TODO: Do not set the ip if request contains SP-Anonymous header
request = req,
pixelExpected = false,
doNotTrack = false,
contentType = req.contentType.map(_.value.toLowerCase),
spAnonymous = spAnonymous
)
case GET | HEAD =>
collectorService.cookie(
queryString = Some(req.queryString),
body = Sync[F].delay(None),
path = path,
cookie = None, //TODO: cookie will be added later
userAgent = userAgent,
refererUri = referer,
hostname = hostname,
ip = ip, // TODO: Do not set the ip if request contains SP-Anonymous header
request = req,
pixelExpected = true,
doNotTrack = false,
contentType = None,
spAnonymous = spAnonymous
)
}
}

val value: HttpApp[F] = (healthRoutes <+> cookieRoutes).orNotFound
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,16 @@ package com.snowplowanalytics.snowplow.collectors.scalastream

import java.util.UUID

import org.apache.commons.codec.binary.Base64

import scala.concurrent.duration._
import scala.collection.JavaConverters._

import cats.effect.{Clock, Sync}
import cats.implicits._

import fs2.Stream

import org.http4s._
import org.http4s.headers._
import org.http4s.implicits._
Expand Down Expand Up @@ -38,13 +42,20 @@ trait Service[F[_]] {
def determinePath(vendor: String, version: String): String
}

object CollectorService {
// Contains an invisible pixel to return for `/i` requests.
val pixel = Base64.decodeBase64("R0lGODlhAQABAPAAAP///wAAACH5BAEAAAAALAAAAAABAAEAAAICRAEAOw==")
}

class CollectorService[F[_]: Sync](
config: CollectorConfig,
sinks: CollectorSinks[F],
appName: String,
appVersion: String
) extends Service[F] {

val pixelStream = Stream.iterable[F, Byte](CollectorService.pixel)

// TODO: Add sink type as well
private val collector = s"$appName-$appVersion"

Expand All @@ -70,8 +81,7 @@ class CollectorService[F[_]: Sync](
hostname <- hostname
// TODO: Get ipAsPartitionKey from config
(ipAddress, partitionKey) = ipAndPartitionKey(ip, ipAsPartitionKey = false)
// TODO: nuid should be set properly
nuid = UUID.randomUUID().toString
nuid = UUID.randomUUID().toString // TODO: nuid should be set properly
event = buildEvent(
queryString,
body,
Expand All @@ -93,9 +103,13 @@ class CollectorService[F[_]: Sync](
spAnonymous = spAnonymous,
now = now
)
responseHeaders = Headers(setCookie.toList.map(_.toRaw1))
headerList = List(
setCookie.map(_.toRaw1),
cacheControl(pixelExpected).map(_.toRaw1)
).flatten
responseHeaders = Headers(headerList)
_ <- sinkEvent(event, partitionKey)
} yield buildHttpResponse(responseHeaders)
} yield buildHttpResponse(responseHeaders, pixelExpected)

def determinePath(vendor: String, version: String): String = {
val original = s"/$vendor/$version"
Expand Down Expand Up @@ -135,8 +149,23 @@ class CollectorService[F[_]: Sync](
}

// TODO: Handle necessary cases to build http response in here
def buildHttpResponse(headers: Headers): Response[F] =
Response(status = Ok, headers = headers)
def buildHttpResponse(
headers: Headers,
pixelExpected: Boolean
): Response[F] =
pixelExpected match {
case true =>
Response[F](
headers = headers.put(`Content-Type`(MediaType.image.gif)),
body = pixelStream
)
// See https://github.com/snowplow/snowplow-javascript-tracker/issues/482
case false =>
Response[F](
status = Ok,
headers = headers
).withEntity("ok")
}

// TODO: Since Remote-Address and Raw-Request-URI is akka-specific headers,
// they aren't included in here. It might be good to search for counterparts in Http4s.
Expand All @@ -153,6 +182,12 @@ class CollectorService[F[_]: Sync](
}
}

/** If the pixel is requested, this attaches cache control headers to the response to prevent any caching. */
def cacheControl(pixelExpected: Boolean): Option[`Cache-Control`] =
if (pixelExpected)
Some(`Cache-Control`(CacheDirective.`no-cache`(), CacheDirective.`no-store`, CacheDirective.`must-revalidate`))
else None

/** Produces the event to the configured sink. */
def sinkEvent(
event: CollectorPayload,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,41 @@
package com.snowplowanalytics.snowplow.collectors.scalastream

import scala.collection.mutable.ListBuffer
import cats.effect.IO
import cats.effect.unsafe.implicits.global
import org.http4s.implicits.http4sLiteralsSyntax
import org.http4s.{Method, Request, RequestCookie, Response, Status}
import com.comcast.ip4s.SocketAddress
import org.http4s.implicits._
import org.http4s._
import org.http4s.headers._
import org.http4s.Status._
import fs2.{Stream, text}
import org.typelevel.ci._
import org.specs2.mutable.Specification

class CollectorRoutesSpec extends Specification {

val collectorService = new Service[IO] {
case class CookieParams(
queryString: Option[String],
body: IO[Option[String]],
path: String,
cookie: Option[RequestCookie],
userAgent: Option[String],
refererUri: Option[String],
hostname: IO[Option[String]],
ip: Option[String],
request: Request[IO],
pixelExpected: Boolean,
doNotTrack: Boolean,
contentType: Option[String],
spAnonymous: Option[String]
)

class TestService() extends Service[IO] {

private val cookieCalls: ListBuffer[CookieParams] = ListBuffer()

def getCookieCalls: List[CookieParams] = cookieCalls.toList

override def cookie(
queryString: Option[String],
body: IO[Option[String]],
Expand All @@ -26,28 +51,115 @@ class CollectorRoutesSpec extends Specification {
contentType: Option[String],
spAnonymous: Option[String]
): IO[Response[IO]] =
IO.pure(Response(status = Ok, body = Stream.emit("cookie").through(text.utf8.encode)))
IO.delay {
cookieCalls += CookieParams(
queryString,
body,
path,
cookie,
userAgent,
refererUri,
hostname,
ip,
request,
pixelExpected,
doNotTrack,
contentType,
spAnonymous
)
Response(status = Ok, body = Stream.emit("cookie").through(text.utf8.encode))
}

override def determinePath(vendor: String, version: String): String = "/p1/p2"
}
val routes = new CollectorRoutes[IO](collectorService).value

val testConnection = Request.Connection(
local = SocketAddress.fromStringIp("127.0.0.1:80").get,
remote = SocketAddress.fromStringIp("127.0.0.1:80").get,
secure = false
)

val testHeaders = Headers(
`User-Agent`(ProductId("testUserAgent")),
Referer(Uri.unsafeFromString("example.com")),
Header.Raw(ci"SP-Anonymous", "*"),
`Content-Type`(MediaType.application.json)
)

def createTestServices = {
val collectorService = new TestService()
val routes = new CollectorRoutes[IO](collectorService).value
(collectorService, routes)
}

"The collector route" should {
"respond to the health route with an ok response" in {
val request = Request[IO](method = Method.GET, uri = uri"/health")
val response = routes.run(request).unsafeRunSync()
val (_, routes) = createTestServices
val request = Request[IO](method = Method.GET, uri = uri"/health")
val response = routes.run(request).unsafeRunSync()

response.status must beEqualTo(Status.Ok)
response.as[String].unsafeRunSync() must beEqualTo("OK")
}

"respond to the post cookie route with the cookie response" in {
val request = Request[IO](method = Method.POST, uri = uri"/p1/p2")
val (collectorService, routes) = createTestServices

val request = Request[IO](method = Method.POST, uri = uri"/p3/p4?a=b&c=d")
.withAttribute(Request.Keys.ConnectionInfo, testConnection)
.withEntity("testBody")
.withHeaders(testHeaders)
val response = routes.run(request).unsafeRunSync()

val List(cookieParams) = collectorService.getCookieCalls
cookieParams.queryString shouldEqual Some("a=b&c=d")
cookieParams.body.unsafeRunSync() shouldEqual Some("testBody")
cookieParams.path shouldEqual "/p1/p2"
cookieParams.cookie shouldEqual None
cookieParams.userAgent shouldEqual Some("testUserAgent")
cookieParams.refererUri shouldEqual Some("example.com")
cookieParams.hostname.unsafeRunSync() shouldEqual Some("localhost")
cookieParams.ip shouldEqual Some("127.0.0.1")
cookieParams.pixelExpected shouldEqual false
cookieParams.doNotTrack shouldEqual false
cookieParams.contentType shouldEqual Some("application/json")
cookieParams.spAnonymous shouldEqual Some("*")

response.status must beEqualTo(Status.Ok)
response.bodyText.compile.string.unsafeRunSync() must beEqualTo("cookie")
}

"respond to the get or head cookie route with the cookie response" in {
def getHeadTest(method: Method) = {
val (collectorService, routes) = createTestServices

val request = Request[IO](method = method, uri = uri"/p3/p4?a=b&c=d")
.withAttribute(Request.Keys.ConnectionInfo, testConnection)
.withEntity("testBody")
.withHeaders(testHeaders)
val response = routes.run(request).unsafeRunSync()

val List(cookieParams) = collectorService.getCookieCalls
cookieParams.queryString shouldEqual Some("a=b&c=d")
cookieParams.body.unsafeRunSync() shouldEqual None
cookieParams.path shouldEqual "/p1/p2"
cookieParams.cookie shouldEqual None
cookieParams.userAgent shouldEqual Some("testUserAgent")
cookieParams.refererUri shouldEqual Some("example.com")
cookieParams.hostname.unsafeRunSync() shouldEqual Some("localhost")
cookieParams.ip shouldEqual Some("127.0.0.1")
cookieParams.pixelExpected shouldEqual true
cookieParams.doNotTrack shouldEqual false
cookieParams.contentType shouldEqual None
cookieParams.spAnonymous shouldEqual Some("*")

response.status must beEqualTo(Status.Ok)
response.bodyText.compile.string.unsafeRunSync() must beEqualTo("cookie")
}

getHeadTest(Method.GET)
getHeadTest(Method.HEAD)
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ class CollectorServiceSpec extends Specification {
)
val event = new CollectorPayload("iglu-schema", "ip", System.currentTimeMillis, "UTF-8", "collector")
val uuidRegex = "[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}".r
val hs = Headers(
`X-Forwarded-For`(IpAddress.fromString("127.0.0.1")),
Cookie(RequestCookie("cookie", "value")),
`Access-Control-Allow-Credentials`()
)

def probeService(): ProbeService = {
val good = new TestSink
Expand Down Expand Up @@ -162,6 +167,30 @@ class CollectorServiceSpec extends Specification {
"image/gif"
).asJava
}

"return necessary cache control headers and respond with pixel when pixelExpected is true" in {
val r = service
.cookie(
queryString = Some("nuid=12"),
body = IO.pure(Some("b")),
path = "p",
cookie = None,
userAgent = None,
refererUri = None,
hostname = IO.pure(Some("h")),
ip = None,
request = Request[IO](),
pixelExpected = true,
doNotTrack = false,
contentType = None,
spAnonymous = Some("*")
)
.unsafeRunSync()
r.headers.get[`Cache-Control`] shouldEqual Some(
`Cache-Control`(CacheDirective.`no-cache`(), CacheDirective.`no-store`, CacheDirective.`must-revalidate`)
)
r.body.compile.toList.unsafeRunSync().toArray shouldEqual CollectorService.pixel
}
}

"buildEvent" in {
Expand Down Expand Up @@ -235,6 +264,20 @@ class CollectorServiceSpec extends Specification {
}
}

"buildHttpResponse" in {
"send back a gif if pixelExpected is true" in {
val res = service.buildHttpResponse(hs, pixelExpected = true)
res.status shouldEqual Status.Ok
res.headers shouldEqual hs.put(`Content-Type`(MediaType.image.gif))
res.body.compile.toList.unsafeRunSync().toArray shouldEqual CollectorService.pixel
}
"send back ok otherwise" in {
val res = service.buildHttpResponse(hs, pixelExpected = false)
res.status shouldEqual Status.Ok
res.bodyText.compile.toList.unsafeRunSync() shouldEqual List("ok")
}
}

"ipAndPartitionkey" in {
"give back the ip and partition key as ip if remote address is defined" in {
val address = Some("127.0.0.1")
Expand Down

0 comments on commit 958ffb6

Please sign in to comment.