Skip to content

Commit

Permalink
Add integration test for X-Forwarded-For (close #288)
Browse files Browse the repository at this point in the history
  • Loading branch information
benjben committed Feb 22, 2023
1 parent a4ed4bc commit 327bec5
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ object EventGenerator {
}
}

private def generateEvents(
def generateEvents(
collectorHost: String,
collectorPort: Int,
nbGood: Int,
Expand All @@ -49,11 +49,11 @@ object EventGenerator {
good ++ bad
}

private def mkTp2Event(
def mkTp2Event(
collectorHost: String,
collectorPort: Int,
valid: Boolean,
maxBytes: Int
valid: Boolean = true,
maxBytes: Int = 100
): Request[IO] = {
val uri = Uri.unsafeFromString(s"http://$collectorHost:$collectorPort/com.snowplowanalytics.snowplow/tp2")
val body = if (valid) "foo" else "a" * (maxBytes + 1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,8 @@ class CustomPathsSpec extends Specification with Localstack with CatsIO {
streamBad,
Some(config)
).use { collector =>
val host = collector.getHost()
val port = collector.getMappedPort(Collector.port)
val requests = originalPaths.map { p =>
val uri = Uri.unsafeFromString(s"http://$host:$port$p")
val uri = Uri.unsafeFromString(s"http://${collector.host}:${collector.port}$p")
Request[IO](Method.POST, uri).withEntity("foo")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,7 @@ class HealthEndpointSpec extends Specification with Localstack with CatsIO {
streamGood,
streamBad
).use { collector =>
val host = collector.getHost()
val port = collector.getMappedPort(Collector.port)
val uri = Uri.unsafeFromString(s"http://$host:$port/health")
val uri = Uri.unsafeFromString(s"http://${collector.host}:${collector.port}/health")
val request = Request[IO](Method.GET, uri)

for {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright (c) 2023-2023 Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0, and
* you may not use this file except in compliance with the Apache License
* Version 2.0. You may obtain a copy of the Apache License Version 2.0 at
* http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the Apache License Version 2.0 is distributed on an "AS
* IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the Apache License Version 2.0 for the specific language
* governing permissions and limitations there under.
*/
package com.snowplowanalytics.snowplow.collectors.scalastream.it.core

import java.net.InetAddress

import scala.concurrent.duration._

import cats.data.NonEmptyList

import cats.effect.IO

import cats.effect.testing.specs2.CatsIO

import org.specs2.mutable.Specification

import org.http4s.headers.`X-Forwarded-For`

import com.snowplowanalytics.snowplow.collectors.scalastream.it.Http
import com.snowplowanalytics.snowplow.collectors.scalastream.it.EventGenerator

import com.snowplowanalytics.snowplow.collectors.scalastream.it.kinesis.containers._
import com.snowplowanalytics.snowplow.collectors.scalastream.it.kinesis.Kinesis

class XForwardedForSpec extends Specification with Localstack with CatsIO {

override protected val Timeout = 5.minutes

"collector" should {
"put X-Forwarded-For header in the collector payload" in {
val testName = "X-Forwarded-For"
val streamGood = s"${testName}-raw"
val streamBad = s"${testName}-bad-1"

val ip = InetAddress.getByName("123.123.123.123")

Collector.container(
"kinesis/src/it/resources/collector.hocon",
testName,
streamGood,
streamBad
).use { collector =>
val request = EventGenerator.mkTp2Event(collector.host, collector.port)
.withHeaders(`X-Forwarded-For`(NonEmptyList.one(Some(ip))))

for {
_ <- Http.sendRequest(request)
_ <- IO.sleep(5.second)
collectorOutput <- Kinesis.readOutput(streamGood, streamBad)
} yield {
val expected = "X-Forwarded-For: 123.123.123.123"
collectorOutput.good match {
case List(one) if one.headers.contains(expected) => ok
case List(one) => ko(s"${one.headers} doesn't contain $expected")
case other => ko(s"${other.size} output collector payload instead of one")
}
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class KinesisCollectorSpec extends Specification with Localstack with CatsIO {
s"${testName}-raw",
s"${testName}-bad-1"
).use { collector =>
IO(collector.getLogs() must contain(("Setting health endpoint to healthy")))
IO(collector.container.getLogs() must contain(("Setting health endpoint to healthy")))
}
}

Expand All @@ -64,8 +64,8 @@ class KinesisCollectorSpec extends Specification with Localstack with CatsIO {
for {
_ <- log(testName, "Sending data")
_ <- EventGenerator.sendEvents(
collector.getHost(),
collector.getMappedPort(Collector.port),
collector.host,
collector.port,
nbGood,
nbBad,
Collector.maxBytes
Expand All @@ -90,13 +90,14 @@ class KinesisCollectorSpec extends Specification with Localstack with CatsIO {
s"${testName}-raw",
s"${testName}-bad-1"
).use { collector =>
val container = collector.container
for {
_ <- log(testName, "Sending signal")
_ <- IO(collector.getDockerClient().killContainerCmd(collector.getContainerId()).withSignal("TERM").exec())
_ <- waitWhile[GenericContainer[_]](collector, _.isRunning, stopTimeout)
_ <- IO(container.getDockerClient().killContainerCmd(container.getContainerId()).withSignal("TERM").exec())
_ <- waitWhile[GenericContainer[_]](container, _.isRunning, stopTimeout)
} yield {
collector.isRunning() must beFalse
collector.getLogs() must contain("Server terminated")
container.isRunning() must beFalse
container.getLogs() must contain("Server terminated")
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ object Collector {
streamGood: String,
streamBad: String,
additionalConfig: Option[String] = None
): Resource[IO, JGenericContainer[_]] = {
): Resource[IO, Collector] = {
val container = GenericContainer(
dockerImage = s"snowplow/scala-stream-collector-kinesis:${ProjectMetadata.version}",
env = Map(
Expand Down Expand Up @@ -67,8 +67,9 @@ object Collector {
Resource.make (
Localstack.createStreams(List(streamGood, streamBad)) *>
IO(startContainerWithLogs(container.container, testName))
.map(c => Collector(c, c.getHost, c.getMappedPort(Collector.port)))
)(
e => IO(e.stop())
c => IO(c.container.stop())
)
}

Expand All @@ -82,3 +83,9 @@ object Collector {
Map("JDK_JAVA_OPTIONS" -> fields)
}
}

case class Collector(
container: JGenericContainer[_],
host: String,
port: Int
)

0 comments on commit 327bec5

Please sign in to comment.