Skip to content
This repository has been archived by the owner on May 9, 2023. It is now read-only.

Commit

Permalink
Fix for KAFKA-4090: Add an SSL check before to create a client (#136)
Browse files Browse the repository at this point in the history
* Add ssl check

* Review fixes

* Review fixes

* Review fixes

* Review fixes

* Review fixes

* Fmt

---------

Co-authored-by: Jules Ivanic <guizmaii@users.noreply.github.com>
  • Loading branch information
gurinderu and guizmaii committed Feb 28, 2023
1 parent 099f956 commit 8dfc8dc
Show file tree
Hide file tree
Showing 14 changed files with 286 additions and 12 deletions.
21 changes: 20 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import sbt.Keys.{ fork, parallelExecution }
import sbt.Tests.{ Group, SubProcess }
import scala.sys.process._

import scala.util.Try
Expand Down Expand Up @@ -174,7 +175,9 @@ lazy val zioKafkaTest =
)
else Seq(embeddedKafka)
},
testFrameworks := Seq(new TestFramework("zio.test.sbt.ZTestFramework"))
testFrameworks := Seq(new TestFramework("zio.test.sbt.ZTestFramework")),
Test / fork := true,
Test / testGrouping := groupByMemory((Test / definedTests).value)
)

lazy val zioKafkaBench =
Expand All @@ -185,5 +188,21 @@ lazy val zioKafkaBench =
.settings(publish / skip := true)
.dependsOn(zioKafka)

def groupByMemory(allTests: Seq[TestDefinition]): Seq[Group] = allTests.groupBy { t =>
val regex = """.*Xmx(\d+[mg])""".r
val data = regex
.findAllIn(t.name)
.matchData
.map { v =>
v.group(1)
}
.toList

data match {
case List(x) => (s"Xmx$x", ForkOptions().withRunJVMOptions(Vector(s"-Xmx$x")))
case _ => ("Default", ForkOptions())
}
}.map { case ((name, opts), tests) => Group(name, tests, SubProcess(opts)) }.toSeq

addCommandAlias("fmt", "all scalafmtSbt scalafmt test:scalafmt")
addCommandAlias("check", "all scalafmtSbtCheck scalafmtCheck test:scalafmtCheck")
6 changes: 3 additions & 3 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.5.0")
addSbtPlugin("io.github.davidgregory084" % "sbt-tpolecat" % "0.4.2")
addSbtPlugin("com.eed3si9n" % "sbt-buildinfo" % "0.11.0")
addSbtPlugin("org.scalameta" % "sbt-mdoc" % "2.3.7")

resolvers ++= Resolver.sonatypeOssRepos("public")
addSbtPlugin("com.github.sbt" % "sbt-ci-release" % "1.5.11")
addSbtPlugin("dev.zio" % "zio-sbt-website" % "0.3.10")
addSbtPlugin("pl.project13.scala" % "sbt-jmh" % "0.4.4")
Binary file not shown.
30 changes: 30 additions & 0 deletions zio-kafka-test-utils/src/main/resources/truststore/ca-key
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
-----BEGIN ENCRYPTED PRIVATE KEY-----
MIIFHDBOBgkqhkiG9w0BBQ0wQTApBgkqhkiG9w0BBQwwHAQIzfQ361sbhwcCAggA
MAwGCCqGSIb3DQIJBQAwFAYIKoZIhvcNAwcECFnRtpSC5wpfBIIEyJcYOkR1kbjO
nnq2FuuObw3k2xwRqI/0EVLeQ8WPh3IPCDVlFLJ71S4HVhoUc9wJd+o+pnS3b9IR
fP8Hf8usTH9LYdnA4m8oeJD79sxbZZzRGaphSMm2AfOGxndhxFXj0uscHWj2b35w
vKDbQdicuxiN1oRo8lZWyVEBgBRDOX8azFoAINPPaTCLj1MSf2YByRMz5CXP83iC
ADkogExPJuIigzQT/UrW+cBPMeQQIvcv/acWAmN8NciQIlzsxJ3gCaFRVIkzuJM0
CqY0hscXWRCWYtHjm0hF5SQSUsG+/5dfthXprN3/H5AZn3mqnBby8SvtyhoPmK89
sNy9JbTeA37yDR7ifP8rbIsiMM+WrCTkRLz/SOAlcQ5CyY0DoQiyheKYzTfH8Qr7
fcswFaRET2iQEBkDiTyA+tDHAkAqmRxOgiSyWoo7lmnhp7/DfxSes8Gfsz6JL93z
cidD9zF7/iRO8Ucnakx1AexyqKVylhQtGGQWXivgpP37cxYdP6Y1HJQAu+/tA0tB
6IreAOpL7M+ULL3hbNL/VTqs+QRN4psbO0BJvBOizoxj40bRMDGNd5Jo3tswmsax
/jb82XRW/drjXKNImo1Uk+HzNr3DCLcncSTWNaRrWMIf7mGs5KOi0vKfT6ZCutkF
Am1QPC5tV7taOprvQABl+OXbs44uSFrgbkTZh5TGADVK7X4xlCiW68aBislAb1pP
lp9CBlud+yirFVsQbPvR7rHTYqa8cEM+MfOKND+QHZ6RwtpFWVTuFKICFjHRrd0k
M42sT8IWVNBpQcFZ08U/0tudOU6Fpb+CwC1LVVYztdmhrTSJGsl/JyITO8AMsXk8
0mCh5z4vHSeRv7+1/6YdQ67stUhJOYI25J18Ez1unRmp0otg3VGQ1RAoyaQ0u1KQ
78NlzB3JVRlCKbB798yMsmlvE1Pu0W81W200+6E0EC/29U8lKyLW26UndAC2wMbs
UHsYmLh+L2jeCdaFHEpxMRSysUX8T+ucHH3R0jQwYhFOHVxHpdSH0WU/wTjZUNC3
5+nRTMg2I8nHlfE0/6osi2/y7DxPfGV/cxvybdBfrIgrZ0BXBqEQ2K+1zieqqIKQ
e0rDNByAd/EuyEeRYXA+8efnzzp76pnxPgNa7T3espqBGiWagXfSq0pJpNokSF2n
nqirsu+iReoCGjjP/LbR8vhWTbj940w5uupnK7T0NSuJsU1VjnsC2PmJNFYoHNsc
e/8R0qe2j8vFpPIM06L3z+zXb7w6FBehRXnvdHkdjv6j/jZ0856hLsWLYprQQK87
hBFc9Qod2IM9RHM1DBLJdwTgMdaWi/NzeFU+ycGwk8Bq3f0i7rEN7wBzKWwr8k3Q
cKVomdOqVgvjmMo4KNhBi9c1WuSIl78LYhdb2g011m3PjWjhTobN7V2vWaioeR5U
ls9bA2CXlduebN+58LhGyJQ8HQ1IDFdXJGv/o8JLvlsa5ePSWo9JGUZM6cIjPXu0
oLoGxd+vI6wmhS7QjuHH8jvoe7KiycnINAxrezcoW0jy43iisiT/OCxU2QNvmx81
vjXQ9RoIC20C2XDS2egTjddJFRFOOL23wRXulgtZ11RzrvvqLVoJ5M7xkLcFbq5l
T7LLJiv2KMJkEbZCgZ+pgA==
-----END ENCRYPTED PRIVATE KEY-----
Binary file not shown.
35 changes: 35 additions & 0 deletions zio-kafka-test-utils/src/main/scala/zio/kafka/KafkaTestUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,15 @@ import zio.kafka.embedded.Kafka
import zio.kafka.producer._
import zio.kafka.serde.{ Deserializer, Serde, Serializer }

import java.io.File
import java.nio.file.Paths

object KafkaTestUtils {

val trustStoreFile: File = Paths.get(this.getClass.getResource("/truststore/kafka.truststore.jks").toURI).toFile

val keyStoreFile: File = Paths.get(this.getClass.getResource("/keystore/kafka.keystore.jks").toURI).toFile

val producerSettings: ZIO[Kafka, Nothing, ProducerSettings] =
ZIO.serviceWith[Kafka](_.bootstrapServers).map(ProducerSettings(_))

Expand Down Expand Up @@ -186,6 +193,26 @@ object KafkaTestUtils {
)
)

def sslAdminSettings: ZIO[Kafka, Nothing, AdminClientSettings] =
ZIO
.serviceWith[Kafka](_.bootstrapServers)
.flatMap(bootstrap =>
ZIO.attempt {
AdminClientSettings(bootstrap).withProperties(
"security.protocol" -> "SSL",
"ssl.truststore.location" -> trustStoreFile.getAbsolutePath,
"ssl.truststore.password" -> "123456",
"ssl.keystore.location" -> keyStoreFile.getAbsolutePath,
"ssl.keystore.password" -> "123456",
"ssl.key.password" -> "123456",
"ssl.enabled.protocols" -> "TLSv1.2",
"ssl.truststore.type" -> "JKS",
"ssl.keystore.type" -> "JKS"
)
}
)
.orDie

def withAdmin[T](f: AdminClient => RIO[Kafka, T]): ZIO[Kafka, Throwable, T] =
for {
settings <- adminSettings
Expand All @@ -203,6 +230,14 @@ object KafkaTestUtils {
fRes <- withAdminClient(settings)(f)
} yield fRes

def withSslAdmin[T](
f: AdminClient => RIO[Kafka, T]
): ZIO[Kafka, Throwable, T] =
for {
settings <- sslAdminSettings
fRes <- withAdminClient(settings)(f)
} yield fRes

private def withAdminClient[R, T](settings: AdminClientSettings)(f: AdminClient => RIO[R, T]) =
ZIO.scoped[R] {
AdminClient
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package zio.kafka

import zio.ZLayer
import zio.kafka.embedded.Kafka
import zio.test._

trait ZIOSpecWithSslKafka extends ZIOSpec[TestEnvironment with Kafka] with KafkaRandom {
override val bootstrap: ZLayer[Any, Any, TestEnvironment with Kafka] =
testEnvironment ++ Kafka.sslEmbedded
}
34 changes: 34 additions & 0 deletions zio-kafka-test-utils/src/main/scala/zio/kafka/embedded/Kafka.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@ package zio.kafka.embedded

import io.github.embeddedkafka.{ EmbeddedK, EmbeddedKafka, EmbeddedKafkaConfig }
import zio._
import _root_.kafka.server.KafkaConfig
import io.github.embeddedkafka.EmbeddedKafkaConfig.defaultKafkaPort
import org.apache.kafka.common.security.auth.SecurityProtocol

import java.nio.file.Paths

trait Kafka {
def bootstrapServers: List[String]
Expand Down Expand Up @@ -55,5 +60,34 @@ object Kafka {
ZIO.acquireRelease(ZIO.attempt(Kafka.Sasl(EmbeddedKafkaService(EmbeddedKafka.start()))))(_.value.stop())
}

val sslEmbedded: ZLayer[Any, Throwable, Kafka] = ZLayer.scoped {
val listener = s"${SecurityProtocol.SSL}://localhost:$defaultKafkaPort"

val keyStorePath = Paths.get(Kafka.getClass.getResource("/keystore/kafka.keystore.jks").toURI).toFile
val trustStorePath = Paths.get(Kafka.getClass.getResource("/truststore/kafka.truststore.jks").toURI).toFile

implicit val embeddedKafkaConfig: EmbeddedKafkaConfig = EmbeddedKafkaConfig(
customBrokerProperties = Map(
"group.min.session.timeout.ms" -> "500",
"group.initial.rebalance.delay.ms" -> "0",
"authorizer.class.name" -> "kafka.security.authorizer.AclAuthorizer",
"super.users" -> "User:ANONYMOUS",
"ssl.client.auth" -> "required",
"ssl.enabled.protocols" -> "TLSv1.2",
"ssl.truststore.type" -> "JKS",
"ssl.keystore.type" -> "JKS",
"ssl.truststore.location" -> trustStorePath.getAbsolutePath,
"ssl.truststore.password" -> "123456",
"ssl.keystore.location" -> keyStorePath.getAbsolutePath,
"ssl.keystore.password" -> "123456",
"ssl.key.password" -> "123456",
KafkaConfig.InterBrokerListenerNameProp -> "SSL",
KafkaConfig.ListenersProp -> listener,
KafkaConfig.AdvertisedListenersProp -> listener
)
)
ZIO.acquireRelease(ZIO.attempt(EmbeddedKafkaService(EmbeddedKafka.start())))(_.stop())
}

val local: ZLayer[Any, Nothing, Kafka] = ZLayer.succeed(DefaultLocal)
}
6 changes: 3 additions & 3 deletions zio-kafka-test/src/test/scala/zio/kafka/AdminSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import org.apache.kafka.clients.admin.ConfigEntry.ConfigSource
import org.apache.kafka.clients.admin.{ ConfigEntry, RecordsToDelete }
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.{ Node => JNode }
import zio.kafka.{ KafkaTestUtils, ZIOKafkaSpec }
import zio._
import zio.kafka.KafkaTestUtils._
import zio.kafka.admin.AdminClient.{
AlterConfigOp,
Expand All @@ -22,15 +22,15 @@ import zio.kafka.admin.AdminClient.{
TopicPartition
}
import zio.kafka.admin.acl._
import zio.kafka.admin.resource.{ PatternType, ResourcePattern, ResourcePatternFilter, ResourceType }
import zio.kafka.consumer.{ CommittableRecord, Consumer, OffsetBatch, Subscription }
import zio.kafka.embedded.Kafka
import zio.kafka.serde.Serde
import zio.kafka.{ KafkaTestUtils, ZIOKafkaSpec }
import zio.stream.ZSink
import zio.test.Assertion._
import zio.test.TestAspect._
import zio.test._
import zio._
import zio.kafka.admin.resource.{ PatternType, ResourcePattern, ResourcePatternFilter, ResourceType }

import java.util.UUID
import java.util.concurrent.TimeoutException
Expand Down
65 changes: 65 additions & 0 deletions zio-kafka-test/src/test/scala/zio/kafka/OOMSpecXmx300m.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package zio.kafka

import org.apache.kafka.clients.producer.ProducerRecord
import zio.ZIO
import zio.kafka.consumer.{ Consumer, Subscription }
import zio.kafka.embedded.Kafka
import zio.kafka.producer.Producer
import zio.kafka.serde.Serde
import zio.test.TestAspect._
import zio.test._

/**
* This test checks the fix for the issue https://issues.apache.org/jira/browse/KAFKA-4090
*/
object OOMSpecXmx300m extends ZIOSpecWithSslKafka {

override val kafkaPrefix: String = "oom-spec"
override def spec: Spec[Kafka, Any] =
suite("OOM check")(
test("producer should fail with ssl check") {
for {
result <- (for {
topic <- randomTopic
_ <- Producer.produce(new ProducerRecord(topic, "boo", "baa"), Serde.string, Serde.string)
} yield ()).provideSomeLayer(KafkaTestUtils.producer).exit
} yield assertTrue(result.isFailure) &&
assertTrue(
result.toEither.left.map(_.getMessage()) == Left(
"Received an unexpected SSL packet from the server. Please ensure the client is properly configured with SSL enabled"
)
)
},
test("consumer should fail with ssl check") {
for {
result <- (for {
topic <- randomTopic
_ <- ZIO.serviceWithZIO[Consumer](
_.consumeWith(
Subscription.Topics(Set(topic)),
Serde.byteArray,
Serde.byteArray
)(_ => ZIO.unit)
)
} yield ()).provideSomeLayer(KafkaTestUtils.consumer("test")).exit
} yield assertTrue(result.isFailure) &&
assertTrue(
result.toEither.left.map(_.getMessage()) == Left(
"Received an unexpected SSL packet from the server. Please ensure the client is properly configured with SSL enabled"
)
)
},
test("admin client should fail with ssl check") {
for {
result <- (KafkaTestUtils.withAdmin { client =>
client.listTopics()
}).exit
} yield assertTrue(result.isFailure) &&
assertTrue(
result.toEither.left.map(_.getMessage()) == Left(
"Received an unexpected SSL packet from the server. Please ensure the client is properly configured with SSL enabled"
)
)
}
) @@ withLiveClock @@ sequential
}
10 changes: 6 additions & 4 deletions zio-kafka/src/main/scala/zio/kafka/admin/AdminClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ import org.apache.kafka.common.{
Uuid
}
import zio._

import zio.kafka.admin.acl._
import zio.kafka.utils.SslHelper

import java.util.Optional
import scala.annotation.{ nowarn, tailrec }
Expand Down Expand Up @@ -1500,9 +1500,11 @@ object AdminClient {
}

def javaClientFromSettings(settings: AdminClientSettings): ZIO[Scope, Throwable, JAdmin] =
ZIO.acquireRelease(ZIO.attempt(JAdmin.create(settings.driverSettings.asJava)))(client =>
ZIO.succeed(client.close(settings.closeTimeout))
)
ZIO.acquireRelease(
SslHelper.validateEndpoint(settings.bootstrapServers, settings.properties) *> ZIO.attempt(
JAdmin.create(settings.driverSettings.asJava)
)
)(client => ZIO.succeed(client.close(settings.closeTimeout)))

implicit final class MapOps[K1, V1](private val v: Map[K1, V1]) extends AnyVal {
def bimap[K2, V2](fk: K1 => K2, fv: V1 => V2): Map[K2, V2] = v.map(kv => fk(kv._1) -> fv(kv._2))
Expand Down
4 changes: 3 additions & 1 deletion zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ package zio.kafka.consumer
import org.apache.kafka.clients.consumer.{ ConsumerRecord, OffsetAndMetadata, OffsetAndTimestamp }
import org.apache.kafka.common.{ Metric, MetricName, PartitionInfo, TopicPartition }
import zio._
import zio.kafka.serde.Deserializer
import zio.kafka.consumer.diagnostics.Diagnostics
import zio.kafka.consumer.internal.{ ConsumerAccess, Runloop }
import zio.kafka.serde.Deserializer
import zio.kafka.utils.SslHelper
import zio.stream.ZStream.Pull
import zio.stream._

Expand Down Expand Up @@ -349,6 +350,7 @@ object Consumer {
diagnostics: Diagnostics = Diagnostics.NoOp
): ZIO[Scope, Throwable, Consumer] =
for {
_ <- SslHelper.validateEndpoint(settings.bootstrapServers, settings.properties)
wrapper <- ConsumerAccess.make(settings)
runloop <- Runloop(
hasGroupId = settings.hasGroupId,
Expand Down
2 changes: 2 additions & 0 deletions zio-kafka/src/main/scala/zio/kafka/producer/Producer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import org.apache.kafka.common.serialization.ByteArraySerializer
import org.apache.kafka.common.{ Metric, MetricName }
import zio._
import zio.kafka.serde.Serializer
import zio.kafka.utils.SslHelper
import zio.stream.{ ZPipeline, ZStream }

import java.util.concurrent.atomic.AtomicLong
Expand Down Expand Up @@ -247,6 +248,7 @@ object Producer {
def make(settings: ProducerSettings): ZIO[Scope, Throwable, Producer] =
for {
props <- ZIO.attempt(settings.driverSettings)
_ <- SslHelper.validateEndpoint(settings.bootstrapServers, props)
rawProducer <- ZIO.attempt(
new KafkaProducer[Array[Byte], Array[Byte]](
props.asJava,
Expand Down
Loading

0 comments on commit 8dfc8dc

Please sign in to comment.