Skip to content
This repository has been archived by the owner on May 9, 2023. It is now read-only.

Commit

Permalink
Add ssl check
Browse files Browse the repository at this point in the history
  • Loading branch information
Nick committed Jan 20, 2023
1 parent d1cd3e6 commit 3394814
Show file tree
Hide file tree
Showing 14 changed files with 467 additions and 9 deletions.
21 changes: 20 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import sbt.Keys.{ fork, parallelExecution }
import sbt.Tests.{ Group, SubProcess }
import scala.sys.process._

import scala.util.Try
Expand Down Expand Up @@ -173,8 +174,26 @@ lazy val zioKafkaTest =
)
else Seq(embeddedKafka)
},
testFrameworks := Seq(new TestFramework("zio.test.sbt.ZTestFramework"))
testFrameworks := Seq(new TestFramework("zio.test.sbt.ZTestFramework")),
Test / fork := true,
Test / testGrouping := groupByMemory((Test / definedTests).value)
)

def groupByMemory(allTests: Seq[TestDefinition]): Seq[Group] = allTests.groupBy { t =>
val regex = """.*Xmx(\d+[mg])""".r
val data = regex
.findAllIn(t.name)
.matchData
.map { v =>
v.group(1)
}
.toList

data match {
case List(x) => (s"Xmx$x", ForkOptions().withRunJVMOptions(Vector(s"-Xmx$x")))
case _ => ("Default", ForkOptions())
}
}.map { case ((name, opts), tests) => Group(name, tests, SubProcess(opts)) }.toSeq

addCommandAlias("fmt", "all scalafmtSbt scalafmt test:scalafmt")
addCommandAlias("check", "all scalafmtSbtCheck scalafmtCheck test:scalafmtCheck")
207 changes: 207 additions & 0 deletions zio-kafka-test-utils/src/main/resources/kafka-generate-ssl.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
#!/usr/bin/env bash

set -e

KEYSTORE_FILENAME="kafka.keystore.jks"
VALIDITY_IN_DAYS=3650
DEFAULT_TRUSTSTORE_FILENAME="kafka.truststore.jks"
TRUSTSTORE_WORKING_DIRECTORY="truststore"
KEYSTORE_WORKING_DIRECTORY="keystore"
CA_CERT_FILE="ca-cert"
KEYSTORE_SIGN_REQUEST="cert-file"
KEYSTORE_SIGN_REQUEST_SRL="ca-cert.srl"
KEYSTORE_SIGNED_CERT="cert-signed"

function file_exists_and_exit() {
echo "'$1' cannot exist. Move or delete it before"
echo "re-running this script."
exit 1
}

if [ -e "$KEYSTORE_WORKING_DIRECTORY" ]; then
file_exists_and_exit $KEYSTORE_WORKING_DIRECTORY
fi

if [ -e "$CA_CERT_FILE" ]; then
file_exists_and_exit $CA_CERT_FILE
fi

if [ -e "$KEYSTORE_SIGN_REQUEST" ]; then
file_exists_and_exit $KEYSTORE_SIGN_REQUEST
fi

if [ -e "$KEYSTORE_SIGN_REQUEST_SRL" ]; then
file_exists_and_exit $KEYSTORE_SIGN_REQUEST_SRL
fi

if [ -e "$KEYSTORE_SIGNED_CERT" ]; then
file_exists_and_exit $KEYSTORE_SIGNED_CERT
fi

echo
echo "Welcome to the Kafka SSL keystore and truststore generator script."

echo
echo "First, do you need to generate a trust store and associated private key,"
echo "or do you already have a trust store file and private key?"
echo
echo -n "Do you need to generate a trust store and associated private key? [yn] "
read generate_trust_store

trust_store_file=""
trust_store_private_key_file=""

if [ "$generate_trust_store" == "y" ]; then
if [ -e "$TRUSTSTORE_WORKING_DIRECTORY" ]; then
file_exists_and_exit $TRUSTSTORE_WORKING_DIRECTORY
fi

mkdir $TRUSTSTORE_WORKING_DIRECTORY
echo
echo "OK, we'll generate a trust store and associated private key."
echo
echo "First, the private key."
echo
echo "You will be prompted for:"
echo " - A password for the private key. Remember this."
echo " - Information about you and your company."
echo " - NOTE that the Common Name (CN) is currently not important."

openssl req -new -x509 -keyout $TRUSTSTORE_WORKING_DIRECTORY/ca-key \
-out $TRUSTSTORE_WORKING_DIRECTORY/$CA_CERT_FILE -days $VALIDITY_IN_DAYS

trust_store_private_key_file="$TRUSTSTORE_WORKING_DIRECTORY/ca-key"

echo
echo "Two files were created:"
echo " - $TRUSTSTORE_WORKING_DIRECTORY/ca-key -- the private key used later to"
echo " sign certificates"
echo " - $TRUSTSTORE_WORKING_DIRECTORY/$CA_CERT_FILE -- the certificate that will be"
echo " stored in the trust store in a moment and serve as the certificate"
echo " authority (CA). Once this certificate has been stored in the trust"
echo " store, it will be deleted. It can be retrieved from the trust store via:"
echo " $ keytool -keystore <trust-store-file> -export -alias CARoot -rfc"

echo
echo "Now the trust store will be generated from the certificate."
echo
echo "You will be prompted for:"
echo " - the trust store's password (labeled 'keystore'). Remember this"
echo " - a confirmation that you want to import the certificate"

keytool -keystore $TRUSTSTORE_WORKING_DIRECTORY/$DEFAULT_TRUSTSTORE_FILENAME \
-alias CARoot -import -file $TRUSTSTORE_WORKING_DIRECTORY/$CA_CERT_FILE

trust_store_file="$TRUSTSTORE_WORKING_DIRECTORY/$DEFAULT_TRUSTSTORE_FILENAME"

echo
echo "$TRUSTSTORE_WORKING_DIRECTORY/$DEFAULT_TRUSTSTORE_FILENAME was created."

# don't need the cert because it's in the trust store.
rm $TRUSTSTORE_WORKING_DIRECTORY/$CA_CERT_FILE
else
echo
echo -n "Enter the path of the trust store file. "
read -e trust_store_file

if ! [ -f $trust_store_file ]; then
echo "$trust_store_file isn't a file. Exiting."
exit 1
fi

echo -n "Enter the path of the trust store's private key. "
read -e trust_store_private_key_file

if ! [ -f $trust_store_private_key_file ]; then
echo "$trust_store_private_key_file isn't a file. Exiting."
exit 1
fi
fi

echo
echo "Continuing with:"
echo " - trust store file: $trust_store_file"
echo " - trust store private key: $trust_store_private_key_file"

mkdir $KEYSTORE_WORKING_DIRECTORY

echo
echo "Now, a keystore will be generated. Each broker and logical client needs its own"
echo "keystore. This script will create only one keystore. Run this script multiple"
echo "times for multiple keystores."
echo
echo "You will be prompted for the following:"
echo " - A keystore password. Remember it."
echo " - Personal information, such as your name."
echo " NOTE: currently in Kafka, the Common Name (CN) does not need to be the FQDN of"
echo " this host. However, at some point, this may change. As such, make the CN"
echo " the FQDN. Some operating systems call the CN prompt 'first / last name'"
echo " - A key password, for the key being generated within the keystore. Remember this."

# To learn more about CNs and FQDNs, read:
# https://docs.oracle.com/javase/7/docs/api/javax/net/ssl/X509ExtendedTrustManager.html

keytool -keystore $KEYSTORE_WORKING_DIRECTORY/$KEYSTORE_FILENAME \
-alias localhost -validity $VALIDITY_IN_DAYS -genkey -keyalg RSA

echo
echo "'$KEYSTORE_WORKING_DIRECTORY/$KEYSTORE_FILENAME' now contains a key pair and a"
echo "self-signed certificate. Again, this keystore can only be used for one broker or"
echo "one logical client. Other brokers or clients need to generate their own keystores."

echo
echo "Fetching the certificate from the trust store and storing in $CA_CERT_FILE."
echo
echo "You will be prompted for the trust store's password (labeled 'keystore')"

keytool -keystore $trust_store_file -export -alias CARoot -rfc -file $CA_CERT_FILE

echo
echo "Now a certificate signing request will be made to the keystore."
echo
echo "You will be prompted for the keystore's password."
keytool -keystore $KEYSTORE_WORKING_DIRECTORY/$KEYSTORE_FILENAME -alias localhost \
-certreq -file $KEYSTORE_SIGN_REQUEST

echo
echo "Now the trust store's private key (CA) will sign the keystore's certificate."
echo
echo "You will be prompted for the trust store's private key password."
openssl x509 -req -CA $CA_CERT_FILE -CAkey $trust_store_private_key_file \
-in $KEYSTORE_SIGN_REQUEST -out $KEYSTORE_SIGNED_CERT \
-days $VALIDITY_IN_DAYS -CAcreateserial
# creates $KEYSTORE_SIGN_REQUEST_SRL which is never used or needed.

echo
echo "Now the CA will be imported into the keystore."
echo
echo "You will be prompted for the keystore's password and a confirmation that you want to"
echo "import the certificate."
keytool -keystore $KEYSTORE_WORKING_DIRECTORY/$KEYSTORE_FILENAME -alias CARoot \
-import -file $CA_CERT_FILE
rm $CA_CERT_FILE # delete the trust store cert because it's stored in the trust store.

echo
echo "Now the keystore's signed certificate will be imported back into the keystore."
echo
echo "You will be prompted for the keystore's password."
keytool -keystore $KEYSTORE_WORKING_DIRECTORY/$KEYSTORE_FILENAME -alias localhost -import \
-file $KEYSTORE_SIGNED_CERT

echo
echo "All done!"
echo
echo "Delete intermediate files? They are:"
echo " - '$KEYSTORE_SIGN_REQUEST_SRL': CA serial number"
echo " - '$KEYSTORE_SIGN_REQUEST': the keystore's certificate signing request"
echo " (that was fulfilled)"
echo " - '$KEYSTORE_SIGNED_CERT': the keystore's certificate, signed by the CA, and stored back"
echo " into the keystore"
echo -n "Delete? [yn] "
read delete_intermediate_files

if [ "$delete_intermediate_files" == "y" ]; then
rm $KEYSTORE_SIGN_REQUEST_SRL
rm $KEYSTORE_SIGN_REQUEST
rm $KEYSTORE_SIGNED_CERT
fi
Binary file not shown.
30 changes: 30 additions & 0 deletions zio-kafka-test-utils/src/main/resources/truststore/ca-key
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
-----BEGIN ENCRYPTED PRIVATE KEY-----
MIIFHDBOBgkqhkiG9w0BBQ0wQTApBgkqhkiG9w0BBQwwHAQIzfQ361sbhwcCAggA
MAwGCCqGSIb3DQIJBQAwFAYIKoZIhvcNAwcECFnRtpSC5wpfBIIEyJcYOkR1kbjO
nnq2FuuObw3k2xwRqI/0EVLeQ8WPh3IPCDVlFLJ71S4HVhoUc9wJd+o+pnS3b9IR
fP8Hf8usTH9LYdnA4m8oeJD79sxbZZzRGaphSMm2AfOGxndhxFXj0uscHWj2b35w
vKDbQdicuxiN1oRo8lZWyVEBgBRDOX8azFoAINPPaTCLj1MSf2YByRMz5CXP83iC
ADkogExPJuIigzQT/UrW+cBPMeQQIvcv/acWAmN8NciQIlzsxJ3gCaFRVIkzuJM0
CqY0hscXWRCWYtHjm0hF5SQSUsG+/5dfthXprN3/H5AZn3mqnBby8SvtyhoPmK89
sNy9JbTeA37yDR7ifP8rbIsiMM+WrCTkRLz/SOAlcQ5CyY0DoQiyheKYzTfH8Qr7
fcswFaRET2iQEBkDiTyA+tDHAkAqmRxOgiSyWoo7lmnhp7/DfxSes8Gfsz6JL93z
cidD9zF7/iRO8Ucnakx1AexyqKVylhQtGGQWXivgpP37cxYdP6Y1HJQAu+/tA0tB
6IreAOpL7M+ULL3hbNL/VTqs+QRN4psbO0BJvBOizoxj40bRMDGNd5Jo3tswmsax
/jb82XRW/drjXKNImo1Uk+HzNr3DCLcncSTWNaRrWMIf7mGs5KOi0vKfT6ZCutkF
Am1QPC5tV7taOprvQABl+OXbs44uSFrgbkTZh5TGADVK7X4xlCiW68aBislAb1pP
lp9CBlud+yirFVsQbPvR7rHTYqa8cEM+MfOKND+QHZ6RwtpFWVTuFKICFjHRrd0k
M42sT8IWVNBpQcFZ08U/0tudOU6Fpb+CwC1LVVYztdmhrTSJGsl/JyITO8AMsXk8
0mCh5z4vHSeRv7+1/6YdQ67stUhJOYI25J18Ez1unRmp0otg3VGQ1RAoyaQ0u1KQ
78NlzB3JVRlCKbB798yMsmlvE1Pu0W81W200+6E0EC/29U8lKyLW26UndAC2wMbs
UHsYmLh+L2jeCdaFHEpxMRSysUX8T+ucHH3R0jQwYhFOHVxHpdSH0WU/wTjZUNC3
5+nRTMg2I8nHlfE0/6osi2/y7DxPfGV/cxvybdBfrIgrZ0BXBqEQ2K+1zieqqIKQ
e0rDNByAd/EuyEeRYXA+8efnzzp76pnxPgNa7T3espqBGiWagXfSq0pJpNokSF2n
nqirsu+iReoCGjjP/LbR8vhWTbj940w5uupnK7T0NSuJsU1VjnsC2PmJNFYoHNsc
e/8R0qe2j8vFpPIM06L3z+zXb7w6FBehRXnvdHkdjv6j/jZ0856hLsWLYprQQK87
hBFc9Qod2IM9RHM1DBLJdwTgMdaWi/NzeFU+ycGwk8Bq3f0i7rEN7wBzKWwr8k3Q
cKVomdOqVgvjmMo4KNhBi9c1WuSIl78LYhdb2g011m3PjWjhTobN7V2vWaioeR5U
ls9bA2CXlduebN+58LhGyJQ8HQ1IDFdXJGv/o8JLvlsa5ePSWo9JGUZM6cIjPXu0
oLoGxd+vI6wmhS7QjuHH8jvoe7KiycnINAxrezcoW0jy43iisiT/OCxU2QNvmx81
vjXQ9RoIC20C2XDS2egTjddJFRFOOL23wRXulgtZ11RzrvvqLVoJ5M7xkLcFbq5l
T7LLJiv2KMJkEbZCgZ+pgA==
-----END ENCRYPTED PRIVATE KEY-----
Binary file not shown.
33 changes: 33 additions & 0 deletions zio-kafka-test-utils/src/main/scala/zio/kafka/KafkaTestUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import zio.kafka.embedded.Kafka
import zio.kafka.producer._
import zio.kafka.serde.{ Deserializer, Serde, Serializer }

import java.nio.file.Paths

object KafkaTestUtils {

val producerSettings: ZIO[Kafka, Nothing, ProducerSettings] =
Expand Down Expand Up @@ -169,6 +171,29 @@ object KafkaTestUtils {
)
)

def sslAdminSettings: ZIO[Kafka, Nothing, AdminClientSettings] =
ZIO
.serviceWith[Kafka](_.bootstrapServers)
.flatMap(bootstrap =>
ZIO.attempt {
val trustStorePath = Paths.get(Kafka.getClass.getResource("/truststore/kafka.truststore.jks").toURI).toFile
val keyStorePath = Paths.get(Kafka.getClass.getResource("/keystore/kafka.keystore.jks").toURI).toFile

AdminClientSettings(bootstrap).withProperties(
"security.protocol" -> "SSL",
"ssl.truststore.location" -> trustStorePath.getAbsolutePath,
"ssl.truststore.password" -> "123456",
"ssl.keystore.location" -> keyStorePath.getAbsolutePath,
"ssl.keystore.password" -> "123456",
"ssl.key.password" -> "123456",
"ssl.enabled.protocols" -> "TLSv1.2",
"ssl.truststore.type" -> "JKS",
"ssl.keystore.type" -> "JKS"
)
}
)
.orDie

def withAdmin[T](f: AdminClient => RIO[Kafka, T]): ZIO[Kafka, Throwable, T] =
for {
settings <- adminSettings
Expand All @@ -186,6 +211,14 @@ object KafkaTestUtils {
fRes <- withAdminClient(settings)(f)
} yield fRes

def withSslAdmin[T](
f: AdminClient => RIO[Kafka, T]
): ZIO[Kafka, Throwable, T] =
for {
settings <- sslAdminSettings
fRes <- withAdminClient(settings)(f)
} yield fRes

private def withAdminClient[R, T](settings: AdminClientSettings)(f: AdminClient => RIO[R, T]) =
ZIO.scoped[R] {
AdminClient
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package zio.kafka

import zio.ZLayer
import zio.kafka.embedded.Kafka
import zio.test._

trait ZIOSpecWithSslKafka extends ZIOSpec[TestEnvironment with Kafka] with KafkaRandom {
override val bootstrap: ZLayer[Any, Any, TestEnvironment with Kafka] =
testEnvironment ++ Kafka.sslEmbedded
}
34 changes: 34 additions & 0 deletions zio-kafka-test-utils/src/main/scala/zio/kafka/embedded/Kafka.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@ package zio.kafka.embedded

import io.github.embeddedkafka.{ EmbeddedK, EmbeddedKafka, EmbeddedKafkaConfig }
import zio._
import _root_.kafka.server.KafkaConfig
import io.github.embeddedkafka.EmbeddedKafkaConfig.defaultKafkaPort
import org.apache.kafka.common.security.auth.SecurityProtocol

import java.nio.file.Paths

trait Kafka {
def bootstrapServers: List[String]
Expand Down Expand Up @@ -55,5 +60,34 @@ object Kafka {
ZIO.acquireRelease(ZIO.attempt(Kafka.Sasl(EmbeddedKafkaService(EmbeddedKafka.start()))))(_.value.stop())
}

val sslEmbedded: ZLayer[Any, Throwable, Kafka] = ZLayer.scoped {
val listener = s"${SecurityProtocol.SSL}://localhost:$defaultKafkaPort"

val keyStorePath = Paths.get(Kafka.getClass.getResource("/keystore/kafka.keystore.jks").toURI).toFile
val trustStorePath = Paths.get(Kafka.getClass.getResource("/truststore/kafka.truststore.jks").toURI).toFile

implicit val embeddedKafkaConfig: EmbeddedKafkaConfig = EmbeddedKafkaConfig(
customBrokerProperties = Map(
"group.min.session.timeout.ms" -> "500",
"group.initial.rebalance.delay.ms" -> "0",
"authorizer.class.name" -> "kafka.security.authorizer.AclAuthorizer",
"super.users" -> "User:ANONYMOUS",
"ssl.client.auth" -> "required",
"ssl.enabled.protocols" -> "TLSv1.2",
"ssl.truststore.type" -> "JKS",
"ssl.keystore.type" -> "JKS",
"ssl.truststore.location" -> trustStorePath.getAbsolutePath,
"ssl.truststore.password" -> "123456",
"ssl.keystore.location" -> keyStorePath.getAbsolutePath,
"ssl.keystore.password" -> "123456",
"ssl.key.password" -> "123456",
KafkaConfig.InterBrokerListenerNameProp -> "SSL",
KafkaConfig.ListenersProp -> listener,
KafkaConfig.AdvertisedListenersProp -> listener
)
)
ZIO.acquireRelease(ZIO.attempt(EmbeddedKafkaService(EmbeddedKafka.start())))(_.stop())
}

val local: ZLayer[Any, Nothing, Kafka] = ZLayer.succeed(DefaultLocal)
}
Loading

0 comments on commit 3394814

Please sign in to comment.