Skip to content

Commit

Permalink
collector-kafka: authenticate with Event Hubs using OAuth2 via MSI (c…
Browse files Browse the repository at this point in the history
…lose #401)
  • Loading branch information
spenes committed Jan 22, 2024
1 parent 6d0abf2 commit 3ed3a43
Show file tree
Hide file tree
Showing 6 changed files with 138 additions and 20 deletions.
8 changes: 8 additions & 0 deletions kafka/src/it/resources/collector.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,19 @@ collector {
name = ${TOPIC_GOOD}
brokers = ${BROKER}
maxBytes = ${MAX_BYTES}
producerConf = {
"security.protocol" = "PLAINTEXT"
"sasl.mechanism" = "GSSAPI"
}
}
bad {
name = ${TOPIC_BAD}
brokers = ${BROKER}
maxBytes = ${MAX_BYTES}
producerConf = {
"security.protocol" = "PLAINTEXT"
"sasl.mechanism" = "GSSAPI"
}
}
}
}
6 changes: 6 additions & 0 deletions kafka/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@ collector {
retries = 10
maxBytes = 1000000
buffer = ${collector.streams.buffer}
producerConf = {
"security.protocol" = "SASL_SSL"
"sasl.mechanism" = "OAUTHBEARER"
"sasl.jaas.config": "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"
"sasl.login.callback.handler.class": "com.snowplowanalytics.snowplow.collectors.scalastream.sinks.AzureAuthenticationCallbackHandler"
}
}

//Legacy style
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/**
* Copyright (c) 2013-present Snowplow Analytics Ltd.
* All rights reserved.
*
* This software is made available by Snowplow Analytics, Ltd.,
* under the terms of the Snowplow Limited Use License Agreement, Version 1.0
* located at https://docs.snowplow.io/limited-use-license-1.0
* BY INSTALLING, DOWNLOADING, ACCESSING, USING OR DISTRIBUTING ANY PORTION
* OF THE SOFTWARE, YOU AGREE TO THE TERMS OF SUCH LICENSE AGREEMENT.
*/
package com.snowplowanalytics.snowplow.collectors.scalastream
package sinks

import java.net.URI
import java.{lang, util}

import javax.security.auth.callback.Callback
import javax.security.auth.callback.UnsupportedCallbackException
import javax.security.auth.login.AppConfigurationEntry

import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler
import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken
import org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback

import com.microsoft.azure.credentials.MSICredentials

import com.nimbusds.jwt.JWTParser

class AzureAuthenticationCallbackHandler extends AuthenticateCallbackHandler {

val credentials: MSICredentials = {
val clientId = sys.env.get("AZURE_CLIENT_ID_FOR_EVENT_HUB").orElse(sys.env.get("AZURE_CLIENT_ID"))
val creds = new MSICredentials()
clientId.map(creds.withClientId).getOrElse(creds)
}

var sbUri: String = ""

override def configure(
configs: util.Map[String, _],
saslMechanism: String,
jaasConfigEntries: util.List[AppConfigurationEntry]
): Unit = {
val bootstrapServer =
configs
.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)
.toString
.replaceAll("\\[|\\]", "")
.split(",")
.toList
.headOption match {
case Some(s) => s
case None => throw new Exception("Empty bootstrap servers list")
}
val uri = URI.create("https://" + bootstrapServer)
this.sbUri = uri.getScheme + "://" + uri.getHost
}

override def handle(callbacks: Array[Callback]): Unit =
callbacks.foreach {
case callback: OAuthBearerTokenCallback =>
val token = getOAuthBearerToken()
callback.token(token)
case callback => throw new UnsupportedCallbackException(callback)
}

def getOAuthBearerToken(): OAuthBearerToken = {
val accessToken = credentials.getToken(sbUri)
val jwt = JWTParser.parse(accessToken)
val claims = jwt.getJWTClaimsSet

new OAuthBearerToken {
override def value(): String = accessToken

override def lifetimeMs(): Long = claims.getExpirationTime.getTime

override def scope(): util.Set[String] = null

override def principalName(): String = null

override def startTimeMs(): lang.Long = null
}
}

override def close(): Unit = ()
}
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,17 @@ object KafkaConfigSpec {
timeLimit = 5000
),
config = KafkaSinkConfig(
maxBytes = 1000000,
brokers = "localhost:9092,another.host:9092",
retries = 10,
producerConf = None
maxBytes = 1000000,
brokers = "localhost:9092,another.host:9092",
retries = 10,
producerConf = Some(
Map(
"security.protocol" -> "SASL_SSL",
"sasl.mechanism" -> "OAUTHBEARER",
"sasl.jaas.config" -> "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;",
"sasl.login.callback.handler.class" -> "com.snowplowanalytics.snowplow.collectors.scalastream.sinks.AzureAuthenticationCallbackHandler"
)
)
)
),
bad = Config.Sink(
Expand All @@ -134,10 +141,17 @@ object KafkaConfigSpec {
timeLimit = 5000
),
config = KafkaSinkConfig(
maxBytes = 1000000,
brokers = "localhost:9092,another.host:9092",
retries = 10,
producerConf = None
maxBytes = 1000000,
brokers = "localhost:9092,another.host:9092",
retries = 10,
producerConf = Some(
Map(
"security.protocol" -> "SASL_SSL",
"sasl.mechanism" -> "OAUTHBEARER",
"sasl.jaas.config" -> "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;",
"sasl.login.callback.handler.class" -> "com.snowplowanalytics.snowplow.collectors.scalastream.sinks.AzureAuthenticationCallbackHandler"
)
)
)
),
useIpAddressAsPartitionKey = false
Expand Down
3 changes: 2 additions & 1 deletion project/BuildSettings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ object BuildSettings {
libraryDependencies ++= Seq(
Dependencies.Libraries.kafkaClients,
Dependencies.Libraries.mskAuth,

Dependencies.Libraries.azureAuth,

// integration tests dependencies
Dependencies.Libraries.IntegrationTests.specs2,
Dependencies.Libraries.IntegrationTests.specs2CE
Expand Down
24 changes: 13 additions & 11 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ object Dependencies {
val thrift = "0.15.0" // force this version to mitigate security vulnerabilities
val tracker = "2.0.0"
val dataDog4s = "0.32.0"
val azureAuth = "1.7.14"
}

object Libraries {
Expand All @@ -63,17 +64,18 @@ object Dependencies {
val datadogStatsd = "com.avast.cloud" %% "datadog4s-statsd" % V.dataDog4s

//sinks
val fs2PubSub = "com.permutive" %% "fs2-google-pubsub-grpc" % V.fs2PubSub
val jackson = "com.fasterxml.jackson.core" % "jackson-databind" % V.jackson
val kafkaClients = "org.apache.kafka" % "kafka-clients" % V.kafka
val kinesis = "com.amazonaws" % "aws-java-sdk-kinesis" % V.awsSdk
val log4j = "org.apache.logging.log4j" % "log4j-core" % V.log4j
val mskAuth = "software.amazon.msk" % "aws-msk-iam-auth" % V.mskAuth % Runtime // Enables AWS MSK IAM authentication https://github.com/snowplow/stream-collector/pull/214
val nettyAll = "io.netty" % "netty-all" % V.nettyAll
val nsqClient = "com.snowplowanalytics" % "nsq-java-client" % V.nsqClient
val pubsub = "com.google.cloud" % "google-cloud-pubsub" % V.pubsub
val sqs = "com.amazonaws" % "aws-java-sdk-sqs" % V.awsSdk
val sts = "com.amazonaws" % "aws-java-sdk-sts" % V.awsSdk % Runtime // Enables web token authentication https://github.com/snowplow/stream-collector/issues/169
val fs2PubSub = "com.permutive" %% "fs2-google-pubsub-grpc" % V.fs2PubSub
val jackson = "com.fasterxml.jackson.core" % "jackson-databind" % V.jackson
val kafkaClients = "org.apache.kafka" % "kafka-clients" % V.kafka
val kinesis = "com.amazonaws" % "aws-java-sdk-kinesis" % V.awsSdk
val log4j = "org.apache.logging.log4j" % "log4j-core" % V.log4j
val mskAuth = "software.amazon.msk" % "aws-msk-iam-auth" % V.mskAuth % Runtime // Enables AWS MSK IAM authentication https://github.com/snowplow/stream-collector/pull/214
val nettyAll = "io.netty" % "netty-all" % V.nettyAll
val nsqClient = "com.snowplowanalytics" % "nsq-java-client" % V.nsqClient
val pubsub = "com.google.cloud" % "google-cloud-pubsub" % V.pubsub
val sqs = "com.amazonaws" % "aws-java-sdk-sqs" % V.awsSdk
val sts = "com.amazonaws" % "aws-java-sdk-sts" % V.awsSdk % Runtime // Enables web token authentication https://github.com/snowplow/stream-collector/issues/169
val azureAuth = "com.microsoft.azure" % "azure-client-authentication" % V.azureAuth

//common unit tests
val specs2 = "org.specs2" %% "specs2-core" % V.specs2 % Test
Expand Down

0 comments on commit 3ed3a43

Please sign in to comment.