Skip to content
This repository has been archived by the owner on May 9, 2023. It is now read-only.

Commit

Permalink
fix SSL check for proxy and redpanda (#158)
Browse files Browse the repository at this point in the history
  • Loading branch information
gbecan authored and guizmaii committed Feb 28, 2023
1 parent 9d8becd commit d127429
Showing 1 changed file with 46 additions and 8 deletions.
54 changes: 46 additions & 8 deletions zio-kafka/src/main/scala/zio/kafka/utils/SslHelper.scala
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package zio.kafka.utils

import org.apache.kafka.clients.{ ClientDnsLookup, ClientUtils }
import org.apache.kafka.common.network.TransferableChannel
import org.apache.kafka.common.protocol.ApiKeys
import org.apache.kafka.common.requests.{ ApiVersionsRequest, RequestHeader }
import zio.{ Task, ZIO }

import java.nio.ByteBuffer
import java.nio.channels.SocketChannel
import java.nio.channels.{ FileChannel, SocketChannel }
import scala.jdk.CollectionConverters._

/**
Expand Down Expand Up @@ -40,13 +43,10 @@ object SslHelper {
ZIO.attempt(SocketChannel.open(addr))
)(channel => ZIO.attempt(channel.close()).orDie)
tls <- ZIO.attempt {
// Send a simple request and read the TLS record type from the answer
val buf = ByteBuffer.allocate(5)
channel.write(buf)
buf.position(0)
channel.read(buf)
buf.position(0)
isTls(buf)
// Send a simple request to check if the cluster accepts the connection
sendTestRequest(channel)
val buffer = readAnswerFromTestRequest(channel)
isTls(buffer)
}
_ <-
ZIO.when(tls)(
Expand All @@ -64,6 +64,44 @@ object SslHelper {
}
.unit

/**
* Send a simple request to check if connection can be established with current configuration
*/
private def sendTestRequest(channel: SocketChannel): Unit = {
val transferableChannel = new TransferableChannel {
override def hasPendingWrites: Boolean = false

override def transferFrom(fileChannel: FileChannel, position: Long, count: Long): Long =
throw new UnsupportedOperationException()

override def write(srcs: Array[ByteBuffer], offset: Int, length: Int): Long = channel.write(srcs, offset, length)

override def write(srcs: Array[ByteBuffer]): Long = channel.write(srcs)

override def write(src: ByteBuffer): Int = channel.write(src)

override def isOpen: Boolean = channel.isOpen

override def close(): Unit = channel.close()
}

// We send an API version request as a minimal, valid and fast request
val send = new ApiVersionsRequest.Builder()
.build(ApiKeys.API_VERSIONS.latestVersion())
.toSend(new RequestHeader(ApiKeys.API_VERSIONS, ApiKeys.API_VERSIONS.latestVersion(), null, 0))
send.writeTo(transferableChannel)
()
}

/**
* Reads the 5 first bytes of the channel to extract the record type of the answer
*/
private def readAnswerFromTestRequest(channel: SocketChannel): ByteBuffer = {
val buf = ByteBuffer.allocate(5)
channel.read(buf)
buf.position(0)
}

/**
* Check if first byte of buffer corresponds to a record type from a TLS server
*/
Expand Down

0 comments on commit d127429

Please sign in to comment.