diff --git a/build.sbt b/build.sbt index 84a7cb62e..4ba84a720 100644 --- a/build.sbt +++ b/build.sbt @@ -1,4 +1,5 @@ import sbt.Keys.{ fork, parallelExecution } +import sbt.Tests.{ Group, SubProcess } import scala.sys.process._ import scala.util.Try @@ -173,8 +174,26 @@ lazy val zioKafkaTest = ) else Seq(embeddedKafka) }, - testFrameworks := Seq(new TestFramework("zio.test.sbt.ZTestFramework")) + testFrameworks := Seq(new TestFramework("zio.test.sbt.ZTestFramework")), + Test / fork := true, + Test / testGrouping := groupByMemory((Test / definedTests).value) ) +def groupByMemory(allTests: Seq[TestDefinition]): Seq[Group] = allTests.groupBy { t => + val regex = """.*Xmx(\d+[mg])""".r + val data = regex + .findAllIn(t.name) + .matchData + .map { v => + v.group(1) + } + .toList + + data match { + case List(x) => (s"Xmx$x", ForkOptions().withRunJVMOptions(Vector(s"-Xmx$x"))) + case _ => ("Default", ForkOptions()) + } +}.map { case ((name, opts), tests) => Group(name, tests, SubProcess(opts)) }.toSeq + addCommandAlias("fmt", "all scalafmtSbt scalafmt test:scalafmt") addCommandAlias("check", "all scalafmtSbtCheck scalafmtCheck test:scalafmtCheck") diff --git a/zio-kafka-test-utils/src/main/resources/keystore/kafka.keystore.jks b/zio-kafka-test-utils/src/main/resources/keystore/kafka.keystore.jks new file mode 100644 index 000000000..2142c078d Binary files /dev/null and b/zio-kafka-test-utils/src/main/resources/keystore/kafka.keystore.jks differ diff --git a/zio-kafka-test-utils/src/main/resources/truststore/ca-key b/zio-kafka-test-utils/src/main/resources/truststore/ca-key new file mode 100644 index 000000000..dff8d516d --- /dev/null +++ b/zio-kafka-test-utils/src/main/resources/truststore/ca-key @@ -0,0 +1,30 @@ +-----BEGIN ENCRYPTED PRIVATE KEY----- +MIIFHDBOBgkqhkiG9w0BBQ0wQTApBgkqhkiG9w0BBQwwHAQIzfQ361sbhwcCAggA +MAwGCCqGSIb3DQIJBQAwFAYIKoZIhvcNAwcECFnRtpSC5wpfBIIEyJcYOkR1kbjO +nnq2FuuObw3k2xwRqI/0EVLeQ8WPh3IPCDVlFLJ71S4HVhoUc9wJd+o+pnS3b9IR +fP8Hf8usTH9LYdnA4m8oeJD79sxbZZzRGaphSMm2AfOGxndhxFXj0uscHWj2b35w +vKDbQdicuxiN1oRo8lZWyVEBgBRDOX8azFoAINPPaTCLj1MSf2YByRMz5CXP83iC +ADkogExPJuIigzQT/UrW+cBPMeQQIvcv/acWAmN8NciQIlzsxJ3gCaFRVIkzuJM0 +CqY0hscXWRCWYtHjm0hF5SQSUsG+/5dfthXprN3/H5AZn3mqnBby8SvtyhoPmK89 +sNy9JbTeA37yDR7ifP8rbIsiMM+WrCTkRLz/SOAlcQ5CyY0DoQiyheKYzTfH8Qr7 +fcswFaRET2iQEBkDiTyA+tDHAkAqmRxOgiSyWoo7lmnhp7/DfxSes8Gfsz6JL93z +cidD9zF7/iRO8Ucnakx1AexyqKVylhQtGGQWXivgpP37cxYdP6Y1HJQAu+/tA0tB +6IreAOpL7M+ULL3hbNL/VTqs+QRN4psbO0BJvBOizoxj40bRMDGNd5Jo3tswmsax +/jb82XRW/drjXKNImo1Uk+HzNr3DCLcncSTWNaRrWMIf7mGs5KOi0vKfT6ZCutkF +Am1QPC5tV7taOprvQABl+OXbs44uSFrgbkTZh5TGADVK7X4xlCiW68aBislAb1pP +lp9CBlud+yirFVsQbPvR7rHTYqa8cEM+MfOKND+QHZ6RwtpFWVTuFKICFjHRrd0k +M42sT8IWVNBpQcFZ08U/0tudOU6Fpb+CwC1LVVYztdmhrTSJGsl/JyITO8AMsXk8 +0mCh5z4vHSeRv7+1/6YdQ67stUhJOYI25J18Ez1unRmp0otg3VGQ1RAoyaQ0u1KQ +78NlzB3JVRlCKbB798yMsmlvE1Pu0W81W200+6E0EC/29U8lKyLW26UndAC2wMbs +UHsYmLh+L2jeCdaFHEpxMRSysUX8T+ucHH3R0jQwYhFOHVxHpdSH0WU/wTjZUNC3 +5+nRTMg2I8nHlfE0/6osi2/y7DxPfGV/cxvybdBfrIgrZ0BXBqEQ2K+1zieqqIKQ +e0rDNByAd/EuyEeRYXA+8efnzzp76pnxPgNa7T3espqBGiWagXfSq0pJpNokSF2n +nqirsu+iReoCGjjP/LbR8vhWTbj940w5uupnK7T0NSuJsU1VjnsC2PmJNFYoHNsc +e/8R0qe2j8vFpPIM06L3z+zXb7w6FBehRXnvdHkdjv6j/jZ0856hLsWLYprQQK87 +hBFc9Qod2IM9RHM1DBLJdwTgMdaWi/NzeFU+ycGwk8Bq3f0i7rEN7wBzKWwr8k3Q +cKVomdOqVgvjmMo4KNhBi9c1WuSIl78LYhdb2g011m3PjWjhTobN7V2vWaioeR5U +ls9bA2CXlduebN+58LhGyJQ8HQ1IDFdXJGv/o8JLvlsa5ePSWo9JGUZM6cIjPXu0 +oLoGxd+vI6wmhS7QjuHH8jvoe7KiycnINAxrezcoW0jy43iisiT/OCxU2QNvmx81 +vjXQ9RoIC20C2XDS2egTjddJFRFOOL23wRXulgtZ11RzrvvqLVoJ5M7xkLcFbq5l +T7LLJiv2KMJkEbZCgZ+pgA== +-----END ENCRYPTED PRIVATE KEY----- diff --git a/zio-kafka-test-utils/src/main/resources/truststore/kafka.truststore.jks b/zio-kafka-test-utils/src/main/resources/truststore/kafka.truststore.jks new file mode 100644 index 000000000..bffabd867 Binary files /dev/null and b/zio-kafka-test-utils/src/main/resources/truststore/kafka.truststore.jks differ diff --git a/zio-kafka-test-utils/src/main/scala/zio/kafka/KafkaTestUtils.scala b/zio-kafka-test-utils/src/main/scala/zio/kafka/KafkaTestUtils.scala index aefcccbbc..e36004c90 100644 --- a/zio-kafka-test-utils/src/main/scala/zio/kafka/KafkaTestUtils.scala +++ b/zio-kafka-test-utils/src/main/scala/zio/kafka/KafkaTestUtils.scala @@ -11,8 +11,15 @@ import zio.kafka.embedded.Kafka import zio.kafka.producer._ import zio.kafka.serde.{ Deserializer, Serde, Serializer } +import java.io.File +import java.nio.file.Paths + object KafkaTestUtils { + val trustStoreFile: File = Paths.get(this.getClass.getResource("/truststore/kafka.truststore.jks").toURI).toFile + + val keyStoreFile: File = Paths.get(this.getClass.getResource("/keystore/kafka.keystore.jks").toURI).toFile + val producerSettings: ZIO[Kafka, Nothing, ProducerSettings] = ZIO.serviceWith[Kafka](_.bootstrapServers).map(ProducerSettings(_)) @@ -169,6 +176,26 @@ object KafkaTestUtils { ) ) + def sslAdminSettings: ZIO[Kafka, Nothing, AdminClientSettings] = + ZIO + .serviceWith[Kafka](_.bootstrapServers) + .flatMap(bootstrap => + ZIO.attempt { + AdminClientSettings(bootstrap).withProperties( + "security.protocol" -> "SSL", + "ssl.truststore.location" -> trustStoreFile.getAbsolutePath, + "ssl.truststore.password" -> "123456", + "ssl.keystore.location" -> keyStoreFile.getAbsolutePath, + "ssl.keystore.password" -> "123456", + "ssl.key.password" -> "123456", + "ssl.enabled.protocols" -> "TLSv1.2", + "ssl.truststore.type" -> "JKS", + "ssl.keystore.type" -> "JKS" + ) + } + ) + .orDie + def withAdmin[T](f: AdminClient => RIO[Kafka, T]): ZIO[Kafka, Throwable, T] = for { settings <- adminSettings @@ -186,6 +213,14 @@ object KafkaTestUtils { fRes <- withAdminClient(settings)(f) } yield fRes + def withSslAdmin[T]( + f: AdminClient => RIO[Kafka, T] + ): ZIO[Kafka, Throwable, T] = + for { + settings <- sslAdminSettings + fRes <- withAdminClient(settings)(f) + } yield fRes + private def withAdminClient[R, T](settings: AdminClientSettings)(f: AdminClient => RIO[R, T]) = ZIO.scoped[R] { AdminClient diff --git a/zio-kafka-test-utils/src/main/scala/zio/kafka/ZIOSpecWithSslKafka.scala b/zio-kafka-test-utils/src/main/scala/zio/kafka/ZIOSpecWithSslKafka.scala new file mode 100644 index 000000000..3995b1153 --- /dev/null +++ b/zio-kafka-test-utils/src/main/scala/zio/kafka/ZIOSpecWithSslKafka.scala @@ -0,0 +1,10 @@ +package zio.kafka + +import zio.ZLayer +import zio.kafka.embedded.Kafka +import zio.test._ + +trait ZIOSpecWithSslKafka extends ZIOSpec[TestEnvironment with Kafka] with KafkaRandom { + override val bootstrap: ZLayer[Any, Any, TestEnvironment with Kafka] = + testEnvironment ++ Kafka.sslEmbedded +} diff --git a/zio-kafka-test-utils/src/main/scala/zio/kafka/embedded/Kafka.scala b/zio-kafka-test-utils/src/main/scala/zio/kafka/embedded/Kafka.scala index 519b7684c..55a37f233 100644 --- a/zio-kafka-test-utils/src/main/scala/zio/kafka/embedded/Kafka.scala +++ b/zio-kafka-test-utils/src/main/scala/zio/kafka/embedded/Kafka.scala @@ -2,6 +2,11 @@ package zio.kafka.embedded import io.github.embeddedkafka.{ EmbeddedK, EmbeddedKafka, EmbeddedKafkaConfig } import zio._ +import _root_.kafka.server.KafkaConfig +import io.github.embeddedkafka.EmbeddedKafkaConfig.defaultKafkaPort +import org.apache.kafka.common.security.auth.SecurityProtocol + +import java.nio.file.Paths trait Kafka { def bootstrapServers: List[String] @@ -55,5 +60,34 @@ object Kafka { ZIO.acquireRelease(ZIO.attempt(Kafka.Sasl(EmbeddedKafkaService(EmbeddedKafka.start()))))(_.value.stop()) } + val sslEmbedded: ZLayer[Any, Throwable, Kafka] = ZLayer.scoped { + val listener = s"${SecurityProtocol.SSL}://localhost:$defaultKafkaPort" + + val keyStorePath = Paths.get(Kafka.getClass.getResource("/keystore/kafka.keystore.jks").toURI).toFile + val trustStorePath = Paths.get(Kafka.getClass.getResource("/truststore/kafka.truststore.jks").toURI).toFile + + implicit val embeddedKafkaConfig: EmbeddedKafkaConfig = EmbeddedKafkaConfig( + customBrokerProperties = Map( + "group.min.session.timeout.ms" -> "500", + "group.initial.rebalance.delay.ms" -> "0", + "authorizer.class.name" -> "kafka.security.authorizer.AclAuthorizer", + "super.users" -> "User:ANONYMOUS", + "ssl.client.auth" -> "required", + "ssl.enabled.protocols" -> "TLSv1.2", + "ssl.truststore.type" -> "JKS", + "ssl.keystore.type" -> "JKS", + "ssl.truststore.location" -> trustStorePath.getAbsolutePath, + "ssl.truststore.password" -> "123456", + "ssl.keystore.location" -> keyStorePath.getAbsolutePath, + "ssl.keystore.password" -> "123456", + "ssl.key.password" -> "123456", + KafkaConfig.InterBrokerListenerNameProp -> "SSL", + KafkaConfig.ListenersProp -> listener, + KafkaConfig.AdvertisedListenersProp -> listener + ) + ) + ZIO.acquireRelease(ZIO.attempt(EmbeddedKafkaService(EmbeddedKafka.start())))(_.stop()) + } + val local: ZLayer[Any, Nothing, Kafka] = ZLayer.succeed(DefaultLocal) } diff --git a/zio-kafka-test/src/test/scala/zio/kafka/AdminSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/AdminSpec.scala index 75ee4fc0f..f63937728 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/AdminSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/AdminSpec.scala @@ -4,7 +4,7 @@ import org.apache.kafka.clients.admin.ConfigEntry.ConfigSource import org.apache.kafka.clients.admin.{ ConfigEntry, RecordsToDelete } import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.{ Node => JNode } -import zio.kafka.{ KafkaTestUtils, ZIOKafkaSpec } +import zio._ import zio.kafka.KafkaTestUtils._ import zio.kafka.admin.AdminClient.{ AlterConfigOp, @@ -22,15 +22,15 @@ import zio.kafka.admin.AdminClient.{ TopicPartition } import zio.kafka.admin.acl._ +import zio.kafka.admin.resource.{ PatternType, ResourcePattern, ResourcePatternFilter, ResourceType } import zio.kafka.consumer.{ CommittableRecord, Consumer, OffsetBatch, Subscription } import zio.kafka.embedded.Kafka import zio.kafka.serde.Serde +import zio.kafka.{ KafkaTestUtils, ZIOKafkaSpec } import zio.stream.ZSink import zio.test.Assertion._ import zio.test.TestAspect._ import zio.test._ -import zio._ -import zio.kafka.admin.resource.{ PatternType, ResourcePattern, ResourcePatternFilter, ResourceType } import java.util.UUID import java.util.concurrent.TimeoutException diff --git a/zio-kafka-test/src/test/scala/zio/kafka/OOMSpecXmx300m.scala b/zio-kafka-test/src/test/scala/zio/kafka/OOMSpecXmx300m.scala new file mode 100644 index 000000000..6456d581d --- /dev/null +++ b/zio-kafka-test/src/test/scala/zio/kafka/OOMSpecXmx300m.scala @@ -0,0 +1,58 @@ +package zio.kafka + +import org.apache.kafka.clients.producer.ProducerRecord +import zio.kafka.consumer.{ Consumer, Subscription } +import zio.kafka.embedded.Kafka +import zio.kafka.producer.Producer +import zio.kafka.serde.Serde +import zio.test.TestAspect._ +import zio.test._ + +/** + * This test checks the fix for the issue https://issues.apache.org/jira/browse/KAFKA-4090 + */ +object OOMSpecXmx300m extends ZIOSpecWithSslKafka { + + override val kafkaPrefix: String = "oom-spec" + override def spec: Spec[Kafka, Any] = + suite("OOM check")( + test("producer should fail with ssl check") { + for { + result <- (for { + topic <- randomTopic + _ <- Producer.produce(new ProducerRecord(topic, "boo", "baa"), Serde.string, Serde.string) + } yield ()).provideSomeLayer(KafkaTestUtils.producer).exit + } yield assertTrue(result.isFailure) && + assertTrue( + result.toEither.left.map(_.getMessage()) == Left( + "Received an unexpected SSL packet from the server. Please ensure the client is properly configured with SSL enabled" + ) + ) + }, + test("consumer should fail with ssl check") { + for { + result <- (for { + topic <- randomTopic + _ <- Consumer.subscribe(Subscription.Topics(Set(topic))) + } yield ()).provideSomeLayer(KafkaTestUtils.consumer("test")).exit + } yield assertTrue(result.isFailure) && + assertTrue( + result.toEither.left.map(_.getMessage()) == Left( + "Received an unexpected SSL packet from the server. Please ensure the client is properly configured with SSL enabled" + ) + ) + }, + test("admin client should fail with ssl check") { + for { + result <- (KafkaTestUtils.withAdmin { client => + client.listTopics() + }).exit + } yield assertTrue(result.isFailure) && + assertTrue( + result.toEither.left.map(_.getMessage()) == Left( + "Received an unexpected SSL packet from the server. Please ensure the client is properly configured with SSL enabled" + ) + ) + } + ) @@ withLiveClock @@ sequential +} diff --git a/zio-kafka/src/main/scala/zio/kafka/admin/AdminClient.scala b/zio-kafka/src/main/scala/zio/kafka/admin/AdminClient.scala index 863a0e59f..f1341b808 100644 --- a/zio-kafka/src/main/scala/zio/kafka/admin/AdminClient.scala +++ b/zio-kafka/src/main/scala/zio/kafka/admin/AdminClient.scala @@ -49,8 +49,8 @@ import org.apache.kafka.common.{ Uuid } import zio._ - import zio.kafka.admin.acl._ +import zio.kafka.utils.SslHelper import java.util.Optional import scala.annotation.{ nowarn, tailrec } @@ -1500,9 +1500,11 @@ object AdminClient { } def javaClientFromSettings(settings: AdminClientSettings): ZIO[Scope, Throwable, JAdmin] = - ZIO.acquireRelease(ZIO.attempt(JAdmin.create(settings.driverSettings.asJava)))(client => - ZIO.succeed(client.close(settings.closeTimeout)) - ) + ZIO.acquireRelease( + SslHelper.validateEndpoint(settings.bootstrapServers, settings.properties) *> ZIO.attempt( + JAdmin.create(settings.driverSettings.asJava) + ) + )(client => ZIO.succeed(client.close(settings.closeTimeout))) implicit final class MapOps[K1, V1](val v: Map[K1, V1]) extends AnyVal { def bimap[K2, V2](fk: K1 => K2, fv: V1 => V2): Map[K2, V2] = v.map(kv => fk(kv._1) -> fv(kv._2)) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala index da58fb6b3..5c8d05340 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala @@ -3,9 +3,10 @@ package zio.kafka.consumer import org.apache.kafka.clients.consumer.{ OffsetAndMetadata, OffsetAndTimestamp } import org.apache.kafka.common.{ Metric, MetricName, PartitionInfo, TopicPartition } import zio._ -import zio.kafka.serde.Deserializer import zio.kafka.consumer.diagnostics.Diagnostics import zio.kafka.consumer.internal.{ ConsumerAccess, Runloop } +import zio.kafka.serde.Deserializer +import zio.kafka.utils.SslHelper import zio.stream.ZStream.Pull import zio.stream._ @@ -349,6 +350,7 @@ object Consumer { diagnostics: Diagnostics = Diagnostics.NoOp ): ZIO[Scope, Throwable, Consumer] = for { + _ <- SslHelper.validateEndpoint(settings.bootstrapServers, settings.properties) wrapper <- ConsumerAccess.make(settings) runloop <- Runloop( settings.hasGroupId, diff --git a/zio-kafka/src/main/scala/zio/kafka/producer/Producer.scala b/zio-kafka/src/main/scala/zio/kafka/producer/Producer.scala index 35b10e27a..050aed1d5 100644 --- a/zio-kafka/src/main/scala/zio/kafka/producer/Producer.scala +++ b/zio-kafka/src/main/scala/zio/kafka/producer/Producer.scala @@ -5,6 +5,7 @@ import org.apache.kafka.common.serialization.ByteArraySerializer import org.apache.kafka.common.{ Metric, MetricName } import zio._ import zio.kafka.serde.Serializer +import zio.kafka.utils.SslHelper import zio.stream.{ ZPipeline, ZStream } import java.util.concurrent.atomic.AtomicLong @@ -247,6 +248,7 @@ object Producer { def make(settings: ProducerSettings): ZIO[Scope, Throwable, Producer] = for { props <- ZIO.attempt(settings.driverSettings) + _ <- SslHelper.validateEndpoint(settings.bootstrapServers, props) rawProducer <- ZIO.attempt( new KafkaProducer[Array[Byte], Array[Byte]]( props.asJava, diff --git a/zio-kafka/src/main/scala/zio/kafka/utils/SslHelper.scala b/zio-kafka/src/main/scala/zio/kafka/utils/SslHelper.scala new file mode 100644 index 000000000..7dc865b46 --- /dev/null +++ b/zio-kafka/src/main/scala/zio/kafka/utils/SslHelper.scala @@ -0,0 +1,75 @@ +package zio.kafka.utils + +import org.apache.kafka.clients.{ ClientDnsLookup, ClientUtils } +import zio.{ Task, ZIO } + +import java.nio.ByteBuffer +import java.nio.channels.SocketChannel +import scala.jdk.CollectionConverters._ + +/** + * This function validates that your Kafka client (Admin, Consumer, or Producer) configurations are valid for the Kafka + * Cluster you want to contact. + * + * This function protects you against this long standing bug in kafka-clients that leads to crash your app with an OOM. + * More details, see: https://issues.apache.org/jira/browse/KAFKA-4090 + */ +object SslHelper { + def validateEndpoint(bootstrapServers: List[String], props: Map[String, AnyRef]): Task[Unit] = + ZIO + .unless( + props + .get("security.protocol") + .exists { + case x: String if x.toUpperCase().contains("SSL") => true + case _ => false + } + ) { + ZIO.blocking { + for { + address <- ZIO.attempt { + ClientUtils + .parseAndValidateAddresses(bootstrapServers.asJava, ClientDnsLookup.USE_ALL_DNS_IPS) + .asScala + .toList + } + _ <- ZIO.foreachParDiscard(address) { addr => + ZIO.scoped { + for { + channel <- ZIO.acquireRelease( + ZIO.attempt(SocketChannel.open(addr)) + )(channel => ZIO.attempt(channel.close()).orDie) + tls <- ZIO.attempt { + // make a simple request here and validate a server response + val buf = ByteBuffer.allocate(5) + channel.write(buf) + buf.position(0) + channel.read(buf) + buf.position(0) + isTls(buf) + } + _ <- + ZIO.when(tls)( + ZIO.fail( + new IllegalArgumentException( + s"Received an unexpected SSL packet from the server. Please ensure the client is properly configured with SSL enabled" + ) + ) + ) + } yield () + } + } + } yield () + } + } + .unit + + private def isTls(buf: ByteBuffer): Boolean = { + val tlsMessageType = buf.get() + tlsMessageType match { + case 20 | 21 | 22 | 23 | 255 => + true + case _ => tlsMessageType >= 128 + } + } +}