From a816525d4792c406511008cf5ac32c3f272c1a3e Mon Sep 17 00:00:00 2001 From: Nick Date: Fri, 20 Jan 2023 14:11:52 +0300 Subject: [PATCH] Add ssl check --- build.sbt | 21 +- .../src/main/resources/kafka-generate-ssl.sh | 207 ++++++++++++++++++ .../resources/keystore/kafka.keystore.jks | Bin 0 -> 4702 bytes .../src/main/resources/truststore/ca-key | 30 +++ .../resources/truststore/kafka.truststore.jks | Bin 0 -> 1254 bytes .../main/scala/zio/kafka/KafkaTestUtils.scala | 33 +++ .../scala/zio/kafka/ZIOSpecWithSslKafka.scala | 10 + .../main/scala/zio/kafka/embedded/Kafka.scala | 34 +++ .../src/test/scala/zio/kafka/AdminSpec.scala | 6 +- .../test/scala/zio/kafka/OOMSpecXmx300m.scala | 55 +++++ .../scala/zio/kafka/admin/AdminClient.scala | 10 +- .../scala/zio/kafka/consumer/Consumer.scala | 4 +- .../scala/zio/kafka/producer/Producer.scala | 2 + .../scala/zio/kafka/utils/SslHelper.scala | 64 ++++++ 14 files changed, 467 insertions(+), 9 deletions(-) create mode 100755 zio-kafka-test-utils/src/main/resources/kafka-generate-ssl.sh create mode 100644 zio-kafka-test-utils/src/main/resources/keystore/kafka.keystore.jks create mode 100644 zio-kafka-test-utils/src/main/resources/truststore/ca-key create mode 100644 zio-kafka-test-utils/src/main/resources/truststore/kafka.truststore.jks create mode 100644 zio-kafka-test-utils/src/main/scala/zio/kafka/ZIOSpecWithSslKafka.scala create mode 100644 zio-kafka-test/src/test/scala/zio/kafka/OOMSpecXmx300m.scala create mode 100644 zio-kafka/src/main/scala/zio/kafka/utils/SslHelper.scala diff --git a/build.sbt b/build.sbt index 67ffe043ba..d382cb6c1f 100644 --- a/build.sbt +++ b/build.sbt @@ -1,4 +1,5 @@ import sbt.Keys.{ fork, parallelExecution } +import sbt.Tests.{ Group, SubProcess } import scala.sys.process._ import scala.util.Try @@ -173,8 +174,26 @@ lazy val zioKafkaTest = ) else Seq(embeddedKafka) }, - testFrameworks := Seq(new TestFramework("zio.test.sbt.ZTestFramework")) + testFrameworks := Seq(new TestFramework("zio.test.sbt.ZTestFramework")), + Test / fork := true, + Test / testGrouping := groupByMemory((Test / definedTests).value) ) +def groupByMemory(allTests: Seq[TestDefinition]): Seq[Group] = allTests.groupBy { t => + val regex = """.*Xmx(\d+[mg])""".r + val data = regex + .findAllIn(t.name) + .matchData + .map { v => + v.group(1) + } + .toList + + data match { + case List(x) => (s"Xmx$x", ForkOptions().withRunJVMOptions(Vector(s"-Xmx$x"))) + case _ => ("Default", ForkOptions()) + } +}.map { case ((name, opts), tests) => Group(name, tests, SubProcess(opts)) }.toSeq + addCommandAlias("fmt", "all scalafmtSbt scalafmt test:scalafmt") addCommandAlias("check", "all scalafmtSbtCheck scalafmtCheck test:scalafmtCheck") diff --git a/zio-kafka-test-utils/src/main/resources/kafka-generate-ssl.sh b/zio-kafka-test-utils/src/main/resources/kafka-generate-ssl.sh new file mode 100755 index 0000000000..39ff3f984f --- /dev/null +++ b/zio-kafka-test-utils/src/main/resources/kafka-generate-ssl.sh @@ -0,0 +1,207 @@ +#!/usr/bin/env bash + +set -e + +KEYSTORE_FILENAME="kafka.keystore.jks" +VALIDITY_IN_DAYS=3650 +DEFAULT_TRUSTSTORE_FILENAME="kafka.truststore.jks" +TRUSTSTORE_WORKING_DIRECTORY="truststore" +KEYSTORE_WORKING_DIRECTORY="keystore" +CA_CERT_FILE="ca-cert" +KEYSTORE_SIGN_REQUEST="cert-file" +KEYSTORE_SIGN_REQUEST_SRL="ca-cert.srl" +KEYSTORE_SIGNED_CERT="cert-signed" + +function file_exists_and_exit() { + echo "'$1' cannot exist. Move or delete it before" + echo "re-running this script." + exit 1 +} + +if [ -e "$KEYSTORE_WORKING_DIRECTORY" ]; then + file_exists_and_exit $KEYSTORE_WORKING_DIRECTORY +fi + +if [ -e "$CA_CERT_FILE" ]; then + file_exists_and_exit $CA_CERT_FILE +fi + +if [ -e "$KEYSTORE_SIGN_REQUEST" ]; then + file_exists_and_exit $KEYSTORE_SIGN_REQUEST +fi + +if [ -e "$KEYSTORE_SIGN_REQUEST_SRL" ]; then + file_exists_and_exit $KEYSTORE_SIGN_REQUEST_SRL +fi + +if [ -e "$KEYSTORE_SIGNED_CERT" ]; then + file_exists_and_exit $KEYSTORE_SIGNED_CERT +fi + +echo +echo "Welcome to the Kafka SSL keystore and truststore generator script." + +echo +echo "First, do you need to generate a trust store and associated private key," +echo "or do you already have a trust store file and private key?" +echo +echo -n "Do you need to generate a trust store and associated private key? [yn] " +read generate_trust_store + +trust_store_file="" +trust_store_private_key_file="" + +if [ "$generate_trust_store" == "y" ]; then + if [ -e "$TRUSTSTORE_WORKING_DIRECTORY" ]; then + file_exists_and_exit $TRUSTSTORE_WORKING_DIRECTORY + fi + + mkdir $TRUSTSTORE_WORKING_DIRECTORY + echo + echo "OK, we'll generate a trust store and associated private key." + echo + echo "First, the private key." + echo + echo "You will be prompted for:" + echo " - A password for the private key. Remember this." + echo " - Information about you and your company." + echo " - NOTE that the Common Name (CN) is currently not important." + + openssl req -new -x509 -keyout $TRUSTSTORE_WORKING_DIRECTORY/ca-key \ + -out $TRUSTSTORE_WORKING_DIRECTORY/$CA_CERT_FILE -days $VALIDITY_IN_DAYS + + trust_store_private_key_file="$TRUSTSTORE_WORKING_DIRECTORY/ca-key" + + echo + echo "Two files were created:" + echo " - $TRUSTSTORE_WORKING_DIRECTORY/ca-key -- the private key used later to" + echo " sign certificates" + echo " - $TRUSTSTORE_WORKING_DIRECTORY/$CA_CERT_FILE -- the certificate that will be" + echo " stored in the trust store in a moment and serve as the certificate" + echo " authority (CA). Once this certificate has been stored in the trust" + echo " store, it will be deleted. It can be retrieved from the trust store via:" + echo " $ keytool -keystore -export -alias CARoot -rfc" + + echo + echo "Now the trust store will be generated from the certificate." + echo + echo "You will be prompted for:" + echo " - the trust store's password (labeled 'keystore'). Remember this" + echo " - a confirmation that you want to import the certificate" + + keytool -keystore $TRUSTSTORE_WORKING_DIRECTORY/$DEFAULT_TRUSTSTORE_FILENAME \ + -alias CARoot -import -file $TRUSTSTORE_WORKING_DIRECTORY/$CA_CERT_FILE + + trust_store_file="$TRUSTSTORE_WORKING_DIRECTORY/$DEFAULT_TRUSTSTORE_FILENAME" + + echo + echo "$TRUSTSTORE_WORKING_DIRECTORY/$DEFAULT_TRUSTSTORE_FILENAME was created." + + # don't need the cert because it's in the trust store. + rm $TRUSTSTORE_WORKING_DIRECTORY/$CA_CERT_FILE +else + echo + echo -n "Enter the path of the trust store file. " + read -e trust_store_file + + if ! [ -f $trust_store_file ]; then + echo "$trust_store_file isn't a file. Exiting." + exit 1 + fi + + echo -n "Enter the path of the trust store's private key. " + read -e trust_store_private_key_file + + if ! [ -f $trust_store_private_key_file ]; then + echo "$trust_store_private_key_file isn't a file. Exiting." + exit 1 + fi +fi + +echo +echo "Continuing with:" +echo " - trust store file: $trust_store_file" +echo " - trust store private key: $trust_store_private_key_file" + +mkdir $KEYSTORE_WORKING_DIRECTORY + +echo +echo "Now, a keystore will be generated. Each broker and logical client needs its own" +echo "keystore. This script will create only one keystore. Run this script multiple" +echo "times for multiple keystores." +echo +echo "You will be prompted for the following:" +echo " - A keystore password. Remember it." +echo " - Personal information, such as your name." +echo " NOTE: currently in Kafka, the Common Name (CN) does not need to be the FQDN of" +echo " this host. However, at some point, this may change. As such, make the CN" +echo " the FQDN. Some operating systems call the CN prompt 'first / last name'" +echo " - A key password, for the key being generated within the keystore. Remember this." + +# To learn more about CNs and FQDNs, read: +# https://docs.oracle.com/javase/7/docs/api/javax/net/ssl/X509ExtendedTrustManager.html + +keytool -keystore $KEYSTORE_WORKING_DIRECTORY/$KEYSTORE_FILENAME \ + -alias localhost -validity $VALIDITY_IN_DAYS -genkey -keyalg RSA + +echo +echo "'$KEYSTORE_WORKING_DIRECTORY/$KEYSTORE_FILENAME' now contains a key pair and a" +echo "self-signed certificate. Again, this keystore can only be used for one broker or" +echo "one logical client. Other brokers or clients need to generate their own keystores." + +echo +echo "Fetching the certificate from the trust store and storing in $CA_CERT_FILE." +echo +echo "You will be prompted for the trust store's password (labeled 'keystore')" + +keytool -keystore $trust_store_file -export -alias CARoot -rfc -file $CA_CERT_FILE + +echo +echo "Now a certificate signing request will be made to the keystore." +echo +echo "You will be prompted for the keystore's password." +keytool -keystore $KEYSTORE_WORKING_DIRECTORY/$KEYSTORE_FILENAME -alias localhost \ + -certreq -file $KEYSTORE_SIGN_REQUEST + +echo +echo "Now the trust store's private key (CA) will sign the keystore's certificate." +echo +echo "You will be prompted for the trust store's private key password." +openssl x509 -req -CA $CA_CERT_FILE -CAkey $trust_store_private_key_file \ + -in $KEYSTORE_SIGN_REQUEST -out $KEYSTORE_SIGNED_CERT \ + -days $VALIDITY_IN_DAYS -CAcreateserial +# creates $KEYSTORE_SIGN_REQUEST_SRL which is never used or needed. + +echo +echo "Now the CA will be imported into the keystore." +echo +echo "You will be prompted for the keystore's password and a confirmation that you want to" +echo "import the certificate." +keytool -keystore $KEYSTORE_WORKING_DIRECTORY/$KEYSTORE_FILENAME -alias CARoot \ + -import -file $CA_CERT_FILE +rm $CA_CERT_FILE # delete the trust store cert because it's stored in the trust store. + +echo +echo "Now the keystore's signed certificate will be imported back into the keystore." +echo +echo "You will be prompted for the keystore's password." +keytool -keystore $KEYSTORE_WORKING_DIRECTORY/$KEYSTORE_FILENAME -alias localhost -import \ + -file $KEYSTORE_SIGNED_CERT + +echo +echo "All done!" +echo +echo "Delete intermediate files? They are:" +echo " - '$KEYSTORE_SIGN_REQUEST_SRL': CA serial number" +echo " - '$KEYSTORE_SIGN_REQUEST': the keystore's certificate signing request" +echo " (that was fulfilled)" +echo " - '$KEYSTORE_SIGNED_CERT': the keystore's certificate, signed by the CA, and stored back" +echo " into the keystore" +echo -n "Delete? [yn] " +read delete_intermediate_files + +if [ "$delete_intermediate_files" == "y" ]; then + rm $KEYSTORE_SIGN_REQUEST_SRL + rm $KEYSTORE_SIGN_REQUEST + rm $KEYSTORE_SIGNED_CERT +fi diff --git a/zio-kafka-test-utils/src/main/resources/keystore/kafka.keystore.jks b/zio-kafka-test-utils/src/main/resources/keystore/kafka.keystore.jks new file mode 100644 index 0000000000000000000000000000000000000000..2142c078dcb6f8047130abc6a054d3274d4d84dd GIT binary patch literal 4702 zcma)AWmFUlvu0tJ&IJLF(|Zk%bkP6c$86a-~B`T97W0QjiAeUPKy+ z>pl0L_q+Gk_x+eNGtbO3Kc6`>Pz2Zv8-N2vfPuIKJV%?GG zX>%ET*_{CHFR>s8B7hUH+AM;db2!Oib-(WX*ifB;0dB{GeMK7hpA-q_5xcN`$9G)v z>t6|}pqxW$QJ#~>)cbreipFWDdzmLTEN5tbWCkvT=UY~KFPkayKrTL zgN4D?Q>^^_7+^@k zMLZqkxCp3kuXWin;qdr0cj08Wxs7IQag1FyZ1d(?BFSxBwOxpqVo$-Z@RY9?4YyHPJqs`rl!_{LD(7*XEfN-f#XBK5|+ zM*WPcAZi)Ho!?x~u8MJUhiQk&O*k%vH*C*Z?(*OEnqZmExSL!R@^(oxOrpJH(laUFTKdB?ain70LsQ#n)#lU|&J&F% z)c%W7fcbuO;q)aIR9w;I_OcnPl*fAq%y!N?;-;h5EbhUVh7A8{qdW}T^iqwG_Ai+E zuB$rB&)1I%&)OGXdXhF`OPO&1?#>K-g|mb0$t;x;Uh`S`TjDuq&e!yxDZ7fC%;{+S z3;r+j6uT8eneP&h=4rH=6?RDB@gvkAHN0@^f#TaQ!NhN5^Bg3R{kTuB$%~k;Q7H>8 zXXa6Q=755J@%6Bp_A!Rf{&{DZuP$AR4dIGwTv%2}9HYbKH5$75u@suUbOZc`J;5_s zdPGJAEI}&al!F-~JR#h6f)buc>0JqV(<-%sPg6X9q(wiuk2nK-c=ma<%&2~pyG}80 zq-L4N@*1V=pF*AZ@ur#MqY_0VSHjq7AnS1=Tcj%?=PARdLbZ3*F6R>`IN-`2X@!uZ-J5*L?91n0`UhfW*1o*+nrO=Joa_o1M-!%s2|6W6 zgb?i0afU=xo@%d3m6Eubx6W$VmXfnTppDkdRtZj6skZfU3gq;n{K2I&4}~@8)P^-L zzZ-7rxH8N^BDofdBsJe)LQ4Bv1!r9fEL4Z9o)Boqx_0`^gm7EQn<)g@gaAV0V?wc0 zkJ!{7TH)Z(;EQC`ioAbvct#12A?uIrjX98XJ>g(>o_-p`k~b`$!laC=ukJIdTekRU zydfkZU(uPy>m>-S6IS-Bs6EA(2=7N^5(Lr~7{U?1Ux#MBSTeC%dpbuZ!qZd}{qtep zPNj+LAo~{V%>b6Z44r7gl6~MbRvhL*t@EIOuU}l__@s3d-+a(G)jR1QEhZklLy&J$bS0OODFa+flA$jgZK(uqriB#>1sDhz1oZNMm{y!O*1qGRsC+@W99rliavD9H<}DqUc(0Rk#b%NHv0zu~ zd;ex=-u3v0LRoYAg*a_GS_-<=sMjkSi6AO~0V`0*sS?qf-$o4-YJ|%IO%K|AUwKmhx?7VtS0MJs zevN7Dx@+IF^QfE@zhv(+DSc_4=BIe?%bBNu1YD~wV5$1bKzswt&Rv3+Oex)} za*4kF`VKXU@~rc0ej42|Uz{>4?7Zy#r_v!_I6j)vreoiWsG;m!GNukE+u_@EF#V#V zxte5aTB1L6Fwu*j8Td!eECKEF?Y4h1pU}XrJ*4~Y7gp$nT%jHMXVaT5Ggnr?j7eZg zwVt`IMFspLnRb(hTCA+$-6lAW+U^xpe7wzGA~XUZi66z$Av4FjH~y8|L6RVY!z>!& zB#tRcxyJnFRP)YqDLm_Kc9dF`vW>`I)dC1)h^<*VUyg-xi+Y96R4GR|(67Cwwd|EF z%-WOV7}vBQ(`iQ>-u%(CUMJ#t8n=R{_3JW8|3s6PBZ>o$H(V8;1cgvtYWsBCMOv(F z$l{te)K<3_I!MNsV>qr}eepq!|E4jwX`9n?VfnEWRZJla(2_-x3f)vmm6;9EAKxsUI8ZFM;rhZ5^+ zHXSyxkgA9dBykpK;2`Ot`)pPp&yB@Z^VCHR`Ip6+YiP0VP-cYURwt-D-nbw2#TKMy z+jlLqi8|}n^PaMzVwXg}X!;F^XZC}o*Cfj%)t7 zo}WpCF2sOs2{gyn`6b5e$1OEh`+7&-WGx{D^-$=eQA zM(Nq5B)%^}TGFO=4zr88(f=+JbtgZWVVh=~=unQU;T-2zxAYNzxzy(>-7CBPi)g3t zqp-L0>{bDftwvn&TZKw5wW;x>TQBSPgb$W(jL)Uz-ec}hVPZqlP|n_?jg>*63H*XD z976*Np5JJWmqx^1)&!1pxQXT#0JW--^B(ca6cNt`Nb;DC+3JO^_=n$rquCYts&h*i zV6v_~Gb*_TD7?J}{d7X^5-w8RzR%J=-{^wdAGT2*UGLfYlc=AYC6uSoF#(QF!^)n| z?Z@T5=G|%_KFP39N*xcHyER|8W!bd*klV!iqGN-*TRjV_EY;{u1+6xWF|JFu=R!5; zMGcvwn!Q$Wp2sVeBKyf1b27fa%SNs=)=`^b%N1XpO8o17@f*wVmMNcOQ5fHEUnKf9 z_?DT}>t|=m8ah$Pj4(im@h!ZU&clPp+;rclCZlh`OTr_YaLGxr!5upitFzt5MAEdT z{V`*ioS1H^+IA$xVrLhlxm`)i?@YWAc?;3?xp?c8S}rrskPW{ImiGqA9iP4>n?Hu& z%>!e!8Fai;2ak8MA_DHG&>y#}m=$rIzWT!{*x(D)nQiEu$gTZOm08lM15?pqRdBujvwfYODfP%V$sCt@Wu)ZIC%9znVT5RD zagg7WhP9r)M7K!-BkL@A_5PpNzDA$-+LXW!4b!Y_>=_VvxxGVfM$ zw&Y_RP9stX1GWJRdpT+L2dq(_-TMyHbK-Q&LA4Wu%*a7^qc?)&D7LjuBSTyyygv92 z^T!uHzn)5C^AZOmK&?t>hNf13J&a0H2pNxr@hj?wpMyM^DvM z7N_WyeAVb)nfD@nhGyQo`P4XSY28iL_G5@4O;7EOtrKW!L^VN*timPCzCjdl2k?E1 z!4LU3geZQ~!fu;$D5Yw-mG;+1)e^Kw3I#~rXSoKNap@d;D=*d+H6fj2FLsrOx@B6( z4{(mX=9Z?cusfwDT?349l3EuDy^PA{s#*YQU4;m*Ej$+QW9BPO1qs%U)ig8Qzynwl zn!9Y=?zWzyk%LA>WXQXYy$Ag|v>Oxoyu3ctM&`kH7 z4K^90LL2qfL@VV{(0Ca8?tDZAj@-n;cN7>tdL>NUMk>-GyB7zXcOTqInsybJBBQTZ z4-8PwuR9Ukc^h@e@tyBX%VE-*l1585(4o?lYyhDv?D?9Qhspe?;gQ^Yu63KYM6$rc7t^{qzVg=PI5-_^TA!+7Vi zGHomkd+5Hmjw559ozeUN)lqks>1F<16AVuKHq`DZu2?D8 zxpd-?bBT=v9c+L0_6FhPV7>1JIWA5qn)$33eRY>sX@?{p+TcpHa%HTwdwhr>@aoUs zO0k1ut0av+1a6K8S5XtAi`%X6m=5Zia>jU`|4KzUvpQY7i#=G4_=w{rON?RW`K5pQ z-Bsez0BAXh2|;j|HjqtZZWzKom#g!kZFV>rZdkTtNe@u60yq0oi&Vv8WYU#);@s4X zUry4dADIL?+nXH9%uMLd4a-m$9gWeNa(xNd%J43v!#?g%`O3{D$!nbCU6c8C^sIJF zLBNXs;5xr?+s53Dg^D-KX<$iH7#yUE8>|hxVZ0ETu;j_1!@@Z>u`crlMN~CWlf9`* z5P=3%w_85u*93jv9^MW92vN(^%o#8DHA-nT8gov*+eI9s+WhqIB}J+@eYqglIc<9} zfd~b3wEnB_?$PeDHNstc{tQ@zjk+U{`WU6&Ha1%1;fxpS zC*AMJ#PMV+)ZjdCkdqBrhIfb{R_e#@}uYzrQZ}6_AZ4rk(uKe^L(Gbg6i+zlHqv7eBIu*Z^f`p zCQ=07ZQ>^+w#yy*m}Q|{tUrZ?wWHbzMx0d`$*o+c`r5p5WUnJzC*q>x#bT5~*&X;3 zkAtHRQ$;k3lJU}*Y%2Tkhh~H4x|_8-H4B8R7iBuDJU^437?$9x8)97@v@28{k790x z8kzSkv&zOkSzpB!uj%~}Hq_bF9I+(dv>0O;zB_%tUyaVplK&{KqUihjdP9W!5HjeE zO9;;XWvcSx5nv?Ij!QtP4e9zP@h-Q_d<7!ol42YS)q)B^N&fk{U;{u{K*+Jno)o)H zYoRZOI-s_&{k{;lrv?B~WN#)sJ21|AISr&z4s^JAp2}ilHavaK4SQ1tt$fn+-zUO< E0e4%uwEzGB literal 0 HcmV?d00001 diff --git a/zio-kafka-test-utils/src/main/resources/truststore/ca-key b/zio-kafka-test-utils/src/main/resources/truststore/ca-key new file mode 100644 index 0000000000..dff8d516dc --- /dev/null +++ b/zio-kafka-test-utils/src/main/resources/truststore/ca-key @@ -0,0 +1,30 @@ +-----BEGIN ENCRYPTED PRIVATE KEY----- +MIIFHDBOBgkqhkiG9w0BBQ0wQTApBgkqhkiG9w0BBQwwHAQIzfQ361sbhwcCAggA +MAwGCCqGSIb3DQIJBQAwFAYIKoZIhvcNAwcECFnRtpSC5wpfBIIEyJcYOkR1kbjO +nnq2FuuObw3k2xwRqI/0EVLeQ8WPh3IPCDVlFLJ71S4HVhoUc9wJd+o+pnS3b9IR +fP8Hf8usTH9LYdnA4m8oeJD79sxbZZzRGaphSMm2AfOGxndhxFXj0uscHWj2b35w +vKDbQdicuxiN1oRo8lZWyVEBgBRDOX8azFoAINPPaTCLj1MSf2YByRMz5CXP83iC +ADkogExPJuIigzQT/UrW+cBPMeQQIvcv/acWAmN8NciQIlzsxJ3gCaFRVIkzuJM0 +CqY0hscXWRCWYtHjm0hF5SQSUsG+/5dfthXprN3/H5AZn3mqnBby8SvtyhoPmK89 +sNy9JbTeA37yDR7ifP8rbIsiMM+WrCTkRLz/SOAlcQ5CyY0DoQiyheKYzTfH8Qr7 +fcswFaRET2iQEBkDiTyA+tDHAkAqmRxOgiSyWoo7lmnhp7/DfxSes8Gfsz6JL93z +cidD9zF7/iRO8Ucnakx1AexyqKVylhQtGGQWXivgpP37cxYdP6Y1HJQAu+/tA0tB +6IreAOpL7M+ULL3hbNL/VTqs+QRN4psbO0BJvBOizoxj40bRMDGNd5Jo3tswmsax +/jb82XRW/drjXKNImo1Uk+HzNr3DCLcncSTWNaRrWMIf7mGs5KOi0vKfT6ZCutkF +Am1QPC5tV7taOprvQABl+OXbs44uSFrgbkTZh5TGADVK7X4xlCiW68aBislAb1pP +lp9CBlud+yirFVsQbPvR7rHTYqa8cEM+MfOKND+QHZ6RwtpFWVTuFKICFjHRrd0k +M42sT8IWVNBpQcFZ08U/0tudOU6Fpb+CwC1LVVYztdmhrTSJGsl/JyITO8AMsXk8 +0mCh5z4vHSeRv7+1/6YdQ67stUhJOYI25J18Ez1unRmp0otg3VGQ1RAoyaQ0u1KQ +78NlzB3JVRlCKbB798yMsmlvE1Pu0W81W200+6E0EC/29U8lKyLW26UndAC2wMbs +UHsYmLh+L2jeCdaFHEpxMRSysUX8T+ucHH3R0jQwYhFOHVxHpdSH0WU/wTjZUNC3 +5+nRTMg2I8nHlfE0/6osi2/y7DxPfGV/cxvybdBfrIgrZ0BXBqEQ2K+1zieqqIKQ +e0rDNByAd/EuyEeRYXA+8efnzzp76pnxPgNa7T3espqBGiWagXfSq0pJpNokSF2n +nqirsu+iReoCGjjP/LbR8vhWTbj940w5uupnK7T0NSuJsU1VjnsC2PmJNFYoHNsc +e/8R0qe2j8vFpPIM06L3z+zXb7w6FBehRXnvdHkdjv6j/jZ0856hLsWLYprQQK87 +hBFc9Qod2IM9RHM1DBLJdwTgMdaWi/NzeFU+ycGwk8Bq3f0i7rEN7wBzKWwr8k3Q +cKVomdOqVgvjmMo4KNhBi9c1WuSIl78LYhdb2g011m3PjWjhTobN7V2vWaioeR5U +ls9bA2CXlduebN+58LhGyJQ8HQ1IDFdXJGv/o8JLvlsa5ePSWo9JGUZM6cIjPXu0 +oLoGxd+vI6wmhS7QjuHH8jvoe7KiycnINAxrezcoW0jy43iisiT/OCxU2QNvmx81 +vjXQ9RoIC20C2XDS2egTjddJFRFOOL23wRXulgtZ11RzrvvqLVoJ5M7xkLcFbq5l +T7LLJiv2KMJkEbZCgZ+pgA== +-----END ENCRYPTED PRIVATE KEY----- diff --git a/zio-kafka-test-utils/src/main/resources/truststore/kafka.truststore.jks b/zio-kafka-test-utils/src/main/resources/truststore/kafka.truststore.jks new file mode 100644 index 0000000000000000000000000000000000000000..bffabd867fc472bec6ef52b779d720b86883cd15 GIT binary patch literal 1254 zcmV&LNQUvClCSwATSID2r7n1hW8Bu2?YQ! z9R>+thDZTr0|Wso1Q5ue1AN6V_)zIw#xW)=7%zZ=1K>|ux+k(es}aU)3xxdCMYeLP zIe<4vI}Hs)K@SD>CXbdqn6FVO1k0%0Z#ZiB8Y1P)M;!NVi{84;`;-M1z-F@5wx|N> z5$)aqPKIoqXeYVmBBraxe1+JrK&WU=pb|99I8s^HTrU&5>g;ljg0fWs z9cWOybdU7QYV>zu zi9}Gu`VrtepdT8sq-OJ*vyAqc6j7uOO9bf9HU z0>fu_VS%fU!+lsfySquG7~~-VpCN1|OmWJAmDaUbFn3k5tu2C#k0e1 ze{rWvyc*Hvub{K?=km3V0C&wxbvGr$%p`#DL`Z01fwt{OU4=+oKn^y#S50*sn)6Cs zh^FGCt7E%t5WGUn5YJTEg}YNq*2@M{GVyxVts~+lTQM5DF~o?F@U?%)GuU%%EtfhA z^Tu6*N<9*T&0b*}W$fxg?vZ_Jyx|T*|96opP%UqB_NBx&+uz-TfODF@! z$%9@IWt0c?d6DaKubGbw{ieFf+i7l#A9#u14+E*K;x`f%z*bQ&)EO|Q=p5D`)|}aE zK_@s!R?xZms3H}pRX++*LdXGcFe@`0->A2P2cZ79$7qS3Mq@!{L*V(dDTxoqj_Pz5$Y596b+b#~5R4MDGkqD)};W_z&;%1Fw5IpvZ;0YZUVZ9m=u(h~|a zo)JrKhzzxQ;j-I#lmdY%Fv*HI5LkTWK{b-oFZ1gRBMBMR9X&6lTy7z#>{g>KUT8`_ zzn^u=QGHwJS4G*;*uwVFF}mb+01Hoz_!Eu1D7yaRB7+usHTi{1&a5DpYcnp~i*66>QD`F}+yoXi_)QDo+KhC8#h>FflL<1_@w>NC9O7 z1OfpC00bb>nmi!aZT6v{T~>n#UCL^xfkjYfet0;4D5da-XW+C16hx0Fzs$1M^woA- QrhbbP1zDJqdjbL{5UTnzAOHXW literal 0 HcmV?d00001 diff --git a/zio-kafka-test-utils/src/main/scala/zio/kafka/KafkaTestUtils.scala b/zio-kafka-test-utils/src/main/scala/zio/kafka/KafkaTestUtils.scala index aefcccbbc4..0c49778247 100644 --- a/zio-kafka-test-utils/src/main/scala/zio/kafka/KafkaTestUtils.scala +++ b/zio-kafka-test-utils/src/main/scala/zio/kafka/KafkaTestUtils.scala @@ -11,6 +11,8 @@ import zio.kafka.embedded.Kafka import zio.kafka.producer._ import zio.kafka.serde.{ Deserializer, Serde, Serializer } +import java.nio.file.Paths + object KafkaTestUtils { val producerSettings: ZIO[Kafka, Nothing, ProducerSettings] = @@ -169,6 +171,29 @@ object KafkaTestUtils { ) ) + def sslAdminSettings: ZIO[Kafka, Nothing, AdminClientSettings] = + ZIO + .serviceWith[Kafka](_.bootstrapServers) + .flatMap(bootstrap => + ZIO.attempt { + val trustStorePath = Paths.get(Kafka.getClass.getResource("/truststore/kafka.truststore.jks").toURI).toFile + val keyStorePath = Paths.get(Kafka.getClass.getResource("/keystore/kafka.keystore.jks").toURI).toFile + + AdminClientSettings(bootstrap).withProperties( + "security.protocol" -> "SSL", + "ssl.truststore.location" -> trustStorePath.getAbsolutePath, + "ssl.truststore.password" -> "123456", + "ssl.keystore.location" -> keyStorePath.getAbsolutePath, + "ssl.keystore.password" -> "123456", + "ssl.key.password" -> "123456", + "ssl.enabled.protocols" -> "TLSv1.2", + "ssl.truststore.type" -> "JKS", + "ssl.keystore.type" -> "JKS" + ) + } + ) + .orDie + def withAdmin[T](f: AdminClient => RIO[Kafka, T]): ZIO[Kafka, Throwable, T] = for { settings <- adminSettings @@ -186,6 +211,14 @@ object KafkaTestUtils { fRes <- withAdminClient(settings)(f) } yield fRes + def withSslAdmin[T]( + f: AdminClient => RIO[Kafka, T] + ): ZIO[Kafka, Throwable, T] = + for { + settings <- sslAdminSettings + fRes <- withAdminClient(settings)(f) + } yield fRes + private def withAdminClient[R, T](settings: AdminClientSettings)(f: AdminClient => RIO[R, T]) = ZIO.scoped[R] { AdminClient diff --git a/zio-kafka-test-utils/src/main/scala/zio/kafka/ZIOSpecWithSslKafka.scala b/zio-kafka-test-utils/src/main/scala/zio/kafka/ZIOSpecWithSslKafka.scala new file mode 100644 index 0000000000..3995b11530 --- /dev/null +++ b/zio-kafka-test-utils/src/main/scala/zio/kafka/ZIOSpecWithSslKafka.scala @@ -0,0 +1,10 @@ +package zio.kafka + +import zio.ZLayer +import zio.kafka.embedded.Kafka +import zio.test._ + +trait ZIOSpecWithSslKafka extends ZIOSpec[TestEnvironment with Kafka] with KafkaRandom { + override val bootstrap: ZLayer[Any, Any, TestEnvironment with Kafka] = + testEnvironment ++ Kafka.sslEmbedded +} diff --git a/zio-kafka-test-utils/src/main/scala/zio/kafka/embedded/Kafka.scala b/zio-kafka-test-utils/src/main/scala/zio/kafka/embedded/Kafka.scala index 519b7684c7..55a37f2336 100644 --- a/zio-kafka-test-utils/src/main/scala/zio/kafka/embedded/Kafka.scala +++ b/zio-kafka-test-utils/src/main/scala/zio/kafka/embedded/Kafka.scala @@ -2,6 +2,11 @@ package zio.kafka.embedded import io.github.embeddedkafka.{ EmbeddedK, EmbeddedKafka, EmbeddedKafkaConfig } import zio._ +import _root_.kafka.server.KafkaConfig +import io.github.embeddedkafka.EmbeddedKafkaConfig.defaultKafkaPort +import org.apache.kafka.common.security.auth.SecurityProtocol + +import java.nio.file.Paths trait Kafka { def bootstrapServers: List[String] @@ -55,5 +60,34 @@ object Kafka { ZIO.acquireRelease(ZIO.attempt(Kafka.Sasl(EmbeddedKafkaService(EmbeddedKafka.start()))))(_.value.stop()) } + val sslEmbedded: ZLayer[Any, Throwable, Kafka] = ZLayer.scoped { + val listener = s"${SecurityProtocol.SSL}://localhost:$defaultKafkaPort" + + val keyStorePath = Paths.get(Kafka.getClass.getResource("/keystore/kafka.keystore.jks").toURI).toFile + val trustStorePath = Paths.get(Kafka.getClass.getResource("/truststore/kafka.truststore.jks").toURI).toFile + + implicit val embeddedKafkaConfig: EmbeddedKafkaConfig = EmbeddedKafkaConfig( + customBrokerProperties = Map( + "group.min.session.timeout.ms" -> "500", + "group.initial.rebalance.delay.ms" -> "0", + "authorizer.class.name" -> "kafka.security.authorizer.AclAuthorizer", + "super.users" -> "User:ANONYMOUS", + "ssl.client.auth" -> "required", + "ssl.enabled.protocols" -> "TLSv1.2", + "ssl.truststore.type" -> "JKS", + "ssl.keystore.type" -> "JKS", + "ssl.truststore.location" -> trustStorePath.getAbsolutePath, + "ssl.truststore.password" -> "123456", + "ssl.keystore.location" -> keyStorePath.getAbsolutePath, + "ssl.keystore.password" -> "123456", + "ssl.key.password" -> "123456", + KafkaConfig.InterBrokerListenerNameProp -> "SSL", + KafkaConfig.ListenersProp -> listener, + KafkaConfig.AdvertisedListenersProp -> listener + ) + ) + ZIO.acquireRelease(ZIO.attempt(EmbeddedKafkaService(EmbeddedKafka.start())))(_.stop()) + } + val local: ZLayer[Any, Nothing, Kafka] = ZLayer.succeed(DefaultLocal) } diff --git a/zio-kafka-test/src/test/scala/zio/kafka/AdminSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/AdminSpec.scala index db71f81fff..785d7dfdcb 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/AdminSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/AdminSpec.scala @@ -4,7 +4,7 @@ import org.apache.kafka.clients.admin.ConfigEntry.ConfigSource import org.apache.kafka.clients.admin.{ ConfigEntry, RecordsToDelete } import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.{ Node => JNode } -import zio.kafka.{ KafkaTestUtils, ZIOKafkaSpec } +import zio._ import zio.kafka.KafkaTestUtils._ import zio.kafka.admin.AdminClient.{ AlterConfigOp, @@ -22,15 +22,15 @@ import zio.kafka.admin.AdminClient.{ TopicPartition } import zio.kafka.admin.acl._ +import zio.kafka.admin.resource.{ PatternType, ResourcePattern, ResourcePatternFilter, ResourceType } import zio.kafka.consumer.{ CommittableRecord, Consumer, OffsetBatch, Subscription } import zio.kafka.embedded.Kafka import zio.kafka.serde.Serde +import zio.kafka.{ KafkaTestUtils, ZIOKafkaSpec } import zio.stream.ZSink import zio.test.Assertion._ import zio.test.TestAspect._ import zio.test._ -import zio._ -import zio.kafka.admin.resource.{ PatternType, ResourcePattern, ResourcePatternFilter, ResourceType } import java.util.UUID import java.util.concurrent.TimeoutException diff --git a/zio-kafka-test/src/test/scala/zio/kafka/OOMSpecXmx300m.scala b/zio-kafka-test/src/test/scala/zio/kafka/OOMSpecXmx300m.scala new file mode 100644 index 0000000000..b5f10b4b0b --- /dev/null +++ b/zio-kafka-test/src/test/scala/zio/kafka/OOMSpecXmx300m.scala @@ -0,0 +1,55 @@ +package zio.kafka + +import org.apache.kafka.clients.producer.ProducerRecord +import zio.kafka.consumer.{ Consumer, Subscription } +import zio.kafka.embedded.Kafka +import zio.kafka.producer.Producer +import zio.kafka.serde.Serde +import zio.test.TestAspect._ +import zio.test._ + +object OOMSpecXmx300m extends ZIOSpecWithSslKafka { + + override val kafkaPrefix: String = "oom-spec" + override def spec: Spec[Kafka, Any] = + suite("OOM check")( + test("producer should fail with ssl check") { + for { + result <- (for { + topic <- randomTopic + _ <- Producer.produce(new ProducerRecord(topic, "boo", "baa"), Serde.string, Serde.string) + } yield ()).provideSomeLayer(KafkaTestUtils.producer).exit + } yield assertTrue(result.isFailure) && + assertTrue( + result.toEither.left.map(_.getMessage()) == Left( + "Received an unexpected SSL packet from the server. Please ensure the client is properly configured with SSL enabled" + ) + ) + }, + test("consumer should fail with ssl check") { + for { + result <- (for { + topic <- randomTopic + _ <- Consumer.subscribe(Subscription.Topics(Set(topic))) + } yield ()).provideSomeLayer(KafkaTestUtils.consumer("test")).exit + } yield assertTrue(result.isFailure) && + assertTrue( + result.toEither.left.map(_.getMessage()) == Left( + "Received an unexpected SSL packet from the server. Please ensure the client is properly configured with SSL enabled" + ) + ) + }, + test("admin client should fail with ssl check") { + for { + result <- (KafkaTestUtils.withAdmin { client => + client.listTopics() + }).exit + } yield assertTrue(result.isFailure) && + assertTrue( + result.toEither.left.map(_.getMessage()) == Left( + "Received an unexpected SSL packet from the server. Please ensure the client is properly configured with SSL enabled" + ) + ) + } + ) @@ withLiveClock @@ sequential +} diff --git a/zio-kafka/src/main/scala/zio/kafka/admin/AdminClient.scala b/zio-kafka/src/main/scala/zio/kafka/admin/AdminClient.scala index 863a0e59fc..f1341b808f 100644 --- a/zio-kafka/src/main/scala/zio/kafka/admin/AdminClient.scala +++ b/zio-kafka/src/main/scala/zio/kafka/admin/AdminClient.scala @@ -49,8 +49,8 @@ import org.apache.kafka.common.{ Uuid } import zio._ - import zio.kafka.admin.acl._ +import zio.kafka.utils.SslHelper import java.util.Optional import scala.annotation.{ nowarn, tailrec } @@ -1500,9 +1500,11 @@ object AdminClient { } def javaClientFromSettings(settings: AdminClientSettings): ZIO[Scope, Throwable, JAdmin] = - ZIO.acquireRelease(ZIO.attempt(JAdmin.create(settings.driverSettings.asJava)))(client => - ZIO.succeed(client.close(settings.closeTimeout)) - ) + ZIO.acquireRelease( + SslHelper.validateEndpoint(settings.bootstrapServers, settings.properties) *> ZIO.attempt( + JAdmin.create(settings.driverSettings.asJava) + ) + )(client => ZIO.succeed(client.close(settings.closeTimeout))) implicit final class MapOps[K1, V1](val v: Map[K1, V1]) extends AnyVal { def bimap[K2, V2](fk: K1 => K2, fv: V1 => V2): Map[K2, V2] = v.map(kv => fk(kv._1) -> fv(kv._2)) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala index 60faeaa649..9d4c4da828 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala @@ -3,9 +3,10 @@ package zio.kafka.consumer import org.apache.kafka.clients.consumer.{ OffsetAndMetadata, OffsetAndTimestamp } import org.apache.kafka.common.{ Metric, MetricName, PartitionInfo, TopicPartition } import zio._ -import zio.kafka.serde.Deserializer import zio.kafka.consumer.diagnostics.Diagnostics import zio.kafka.consumer.internal.{ ConsumerAccess, Runloop } +import zio.kafka.serde.Deserializer +import zio.kafka.utils.SslHelper import zio.stream.ZStream.Pull import zio.stream._ @@ -349,6 +350,7 @@ object Consumer { diagnostics: Diagnostics = Diagnostics.NoOp ): ZIO[Scope, Throwable, Consumer] = for { + _ <- SslHelper.validateEndpoint(settings.bootstrapServers, settings.properties) wrapper <- ConsumerAccess.make(settings) runloop <- Runloop( settings.hasGroupId, diff --git a/zio-kafka/src/main/scala/zio/kafka/producer/Producer.scala b/zio-kafka/src/main/scala/zio/kafka/producer/Producer.scala index 3c720bed85..5268b81901 100644 --- a/zio-kafka/src/main/scala/zio/kafka/producer/Producer.scala +++ b/zio-kafka/src/main/scala/zio/kafka/producer/Producer.scala @@ -11,6 +11,7 @@ import org.apache.kafka.common.serialization.ByteArraySerializer import org.apache.kafka.common.{ Metric, MetricName } import zio._ import zio.kafka.serde.Serializer +import zio.kafka.utils.SslHelper import zio.stream.{ ZPipeline, ZStream } import java.util.concurrent.atomic.AtomicLong @@ -255,6 +256,7 @@ object Producer { def make(settings: ProducerSettings): ZIO[Scope, Throwable, Producer] = for { props <- ZIO.attempt(settings.driverSettings) + _ <- SslHelper.validateEndpoint(settings.bootstrapServers, props) rawProducer <- ZIO.attempt( new KafkaProducer[Array[Byte], Array[Byte]]( props.asJava, diff --git a/zio-kafka/src/main/scala/zio/kafka/utils/SslHelper.scala b/zio-kafka/src/main/scala/zio/kafka/utils/SslHelper.scala new file mode 100644 index 0000000000..44bf947256 --- /dev/null +++ b/zio-kafka/src/main/scala/zio/kafka/utils/SslHelper.scala @@ -0,0 +1,64 @@ +package zio.kafka.utils + +import zio.{ Task, ZIO } + +import java.net.InetSocketAddress +import java.nio.ByteBuffer +import java.nio.channels.SocketChannel + +object SslHelper { + // https://issues.apache.org/jira/browse/KAFKA-4090 + def validateEndpoint(bootstrapServers: List[String], props: Map[String, AnyRef]): Task[Unit] = + ZIO + .unless( + props + .get("security.protocol") + .collect { case x: String => + x + } + .exists(_.toLowerCase().contains("SSL")) + ) { + ZIO.foreachParDiscard(bootstrapServers) { str => + ZIO.scoped { + for { + address <- ZIO.attempt { + val arr = str.split(":") + val host = arr(0) + val port = arr(1).toInt + new InetSocketAddress(host, port) + } + channel <- ZIO.acquireRelease( + ZIO.attemptBlocking(SocketChannel.open(address)) + )(channel => ZIO.attempt(channel.close()).orDie) + tls <- ZIO.attemptBlocking { + val buf = ByteBuffer.allocate(5) + channel.write(buf) + buf.position(0) + channel.read(buf) + buf.position(0) + isTls(buf) + } + _ <- + ZIO.when(tls)( + ZIO.fail( + new IllegalArgumentException( + s"Received an unexpected SSL packet from the server. Please ensure the client is properly configured with SSL enabled" + ) + ) + ) + } yield () + } + + } + } + .unit + + private def isTls(buf: ByteBuffer): Boolean = { + val tlsMessageType = buf.get() + tlsMessageType match { + case 20 | 21 | 22 | 23 | 255 => + true + case _ => tlsMessageType >= 128 + } + } +}