From 10796aab5d9015f61a4ecbec1ae2ddc2c4b3a42c Mon Sep 17 00:00:00 2001 From: spenes Date: Mon, 11 Dec 2023 17:36:50 +0300 Subject: [PATCH] collector-kafka: authenticate with Event Hubs using OAuth2 via MSI (close #401) --- kafka/src/main/resources/application.conf | 6 ++ .../AzureAuthenticationCallbackHandler.scala | 71 +++++++++++++++++++ .../KafkaConfigSpec.scala | 30 +++++--- project/BuildSettings.scala | 3 +- project/Dependencies.scala | 24 ++++--- 5 files changed, 114 insertions(+), 20 deletions(-) create mode 100644 kafka/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/AzureAuthenticationCallbackHandler.scala diff --git a/kafka/src/main/resources/application.conf b/kafka/src/main/resources/application.conf index 275fd19d1..4ed4706ca 100644 --- a/kafka/src/main/resources/application.conf +++ b/kafka/src/main/resources/application.conf @@ -11,6 +11,12 @@ collector { retries = 10 maxBytes = 1000000 buffer = ${collector.streams.buffer} + producerConf = { + "security.protocol" = "SASL_SSL" + "sasl.mechanism" = "OAUTHBEARER" + "sasl.jaas.config": "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;" + "sasl.login.callback.handler.class": "com.snowplowanalytics.snowplow.collectors.scalastream.sinks.AzureAuthenticationCallbackHandler" + } } //Legacy style diff --git a/kafka/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/AzureAuthenticationCallbackHandler.scala b/kafka/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/AzureAuthenticationCallbackHandler.scala new file mode 100644 index 000000000..61fc70d50 --- /dev/null +++ b/kafka/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/AzureAuthenticationCallbackHandler.scala @@ -0,0 +1,71 @@ +/** + * Copyright (c) 2013-present Snowplow Analytics Ltd. + * All rights reserved. + * + * This program is licensed to you under the Snowplow Community License Version 1.0, + * and you may not use this file except in compliance with the Snowplow Community License Version 1.0. + * You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0 + */ +package com.snowplowanalytics.snowplow.collectors.scalastream +package sinks + +import java.net.URI +import java.{lang, util} + +import javax.security.auth.callback.Callback +import javax.security.auth.callback.UnsupportedCallbackException +import javax.security.auth.login.AppConfigurationEntry + +import org.apache.kafka.clients.producer.ProducerConfig +import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler +import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken +import org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback + +import com.microsoft.azure.credentials.MSICredentials + +import com.nimbusds.jwt.JWTParser + +class AzureAuthenticationCallbackHandler extends AuthenticateCallbackHandler { + + val credentials: MSICredentials = new MSICredentials() + + var sbUri: String = "" + + override def configure( + configs: util.Map[String, _], + saslMechanism: String, + jaasConfigEntries: util.List[AppConfigurationEntry] + ): Unit = { + val bootstrapServer = configs.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG).toString.replaceAll("\\[|\\]", "") + val uri = URI.create("https://" + bootstrapServer) + this.sbUri = uri.getScheme + "://" + uri.getHost + } + + override def handle(callbacks: Array[Callback]): Unit = + callbacks.foreach { + case callback: OAuthBearerTokenCallback => + val token = getOAuthBearerToken() + callback.token(token) + case callback => throw new UnsupportedCallbackException(callback) + } + + def getOAuthBearerToken(): OAuthBearerToken = { + val accessToken = credentials.getToken(sbUri) + val jwt = JWTParser.parse(accessToken) + val claims = jwt.getJWTClaimsSet + + new OAuthBearerToken { + override def value(): String = accessToken + + override def lifetimeMs(): Long = claims.getExpirationTime.getTime + + override def scope(): util.Set[String] = null + + override def principalName(): String = null + + override def startTimeMs(): lang.Long = null + } + } + + override def close(): Unit = () +} diff --git a/kafka/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/KafkaConfigSpec.scala b/kafka/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/KafkaConfigSpec.scala index 4762ae8fb..3942386e5 100644 --- a/kafka/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/KafkaConfigSpec.scala +++ b/kafka/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/KafkaConfigSpec.scala @@ -105,10 +105,17 @@ object KafkaConfigSpec { timeLimit = 5000 ), config = KafkaSinkConfig( - maxBytes = 1000000, - brokers = "localhost:9092,another.host:9092", - retries = 10, - producerConf = None + maxBytes = 1000000, + brokers = "localhost:9092,another.host:9092", + retries = 10, + producerConf = Some( + Map( + "security.protocol" -> "SASL_SSL", + "sasl.mechanism" -> "OAUTHBEARER", + "sasl.jaas.config" -> "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;", + "sasl.login.callback.handler.class" -> "com.snowplowanalytics.snowplow.collectors.scalastream.sinks.AzureAuthenticationCallbackHandler" + ) + ) ) ), bad = Config.Sink( @@ -119,10 +126,17 @@ object KafkaConfigSpec { timeLimit = 5000 ), config = KafkaSinkConfig( - maxBytes = 1000000, - brokers = "localhost:9092,another.host:9092", - retries = 10, - producerConf = None + maxBytes = 1000000, + brokers = "localhost:9092,another.host:9092", + retries = 10, + producerConf = Some( + Map( + "security.protocol" -> "SASL_SSL", + "sasl.mechanism" -> "OAUTHBEARER", + "sasl.jaas.config" -> "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;", + "sasl.login.callback.handler.class" -> "com.snowplowanalytics.snowplow.collectors.scalastream.sinks.AzureAuthenticationCallbackHandler" + ) + ) ) ), useIpAddressAsPartitionKey = false diff --git a/project/BuildSettings.scala b/project/BuildSettings.scala index aa1684a10..8bf25d634 100644 --- a/project/BuildSettings.scala +++ b/project/BuildSettings.scala @@ -86,7 +86,8 @@ object BuildSettings { libraryDependencies ++= Seq( Dependencies.Libraries.kafkaClients, Dependencies.Libraries.mskAuth, - + Dependencies.Libraries.azureAuth, + // integration tests dependencies Dependencies.Libraries.IntegrationTests.specs2, Dependencies.Libraries.IntegrationTests.specs2CE diff --git a/project/Dependencies.scala b/project/Dependencies.scala index f6cd2ded6..916de7f5b 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -37,6 +37,7 @@ object Dependencies { val testcontainers = "0.40.10" val thrift = "0.15.0" // force this version to mitigate security vulnerabilities val tracker = "2.0.0" + val azureAuth = "1.7.14" } object Libraries { @@ -58,17 +59,18 @@ object Dependencies { val trackerCore = "com.snowplowanalytics" %% "snowplow-scala-tracker-core" % V.tracker //sinks - val fs2PubSub = "com.permutive" %% "fs2-google-pubsub-grpc" % V.fs2PubSub - val jackson = "com.fasterxml.jackson.core" % "jackson-databind" % V.jackson - val kafkaClients = "org.apache.kafka" % "kafka-clients" % V.kafka - val kinesis = "com.amazonaws" % "aws-java-sdk-kinesis" % V.awsSdk - val log4j = "org.apache.logging.log4j" % "log4j-core" % V.log4j - val mskAuth = "software.amazon.msk" % "aws-msk-iam-auth" % V.mskAuth % Runtime // Enables AWS MSK IAM authentication https://github.com/snowplow/stream-collector/pull/214 - val nettyAll = "io.netty" % "netty-all" % V.nettyAll - val nsqClient = "com.snowplowanalytics" % "nsq-java-client" % V.nsqClient - val pubsub = "com.google.cloud" % "google-cloud-pubsub" % V.pubsub - val sqs = "com.amazonaws" % "aws-java-sdk-sqs" % V.awsSdk - val sts = "com.amazonaws" % "aws-java-sdk-sts" % V.awsSdk % Runtime // Enables web token authentication https://github.com/snowplow/stream-collector/issues/169 + val fs2PubSub = "com.permutive" %% "fs2-google-pubsub-grpc" % V.fs2PubSub + val jackson = "com.fasterxml.jackson.core" % "jackson-databind" % V.jackson + val kafkaClients = "org.apache.kafka" % "kafka-clients" % V.kafka + val kinesis = "com.amazonaws" % "aws-java-sdk-kinesis" % V.awsSdk + val log4j = "org.apache.logging.log4j" % "log4j-core" % V.log4j + val mskAuth = "software.amazon.msk" % "aws-msk-iam-auth" % V.mskAuth % Runtime // Enables AWS MSK IAM authentication https://github.com/snowplow/stream-collector/pull/214 + val nettyAll = "io.netty" % "netty-all" % V.nettyAll + val nsqClient = "com.snowplowanalytics" % "nsq-java-client" % V.nsqClient + val pubsub = "com.google.cloud" % "google-cloud-pubsub" % V.pubsub + val sqs = "com.amazonaws" % "aws-java-sdk-sqs" % V.awsSdk + val sts = "com.amazonaws" % "aws-java-sdk-sts" % V.awsSdk % Runtime // Enables web token authentication https://github.com/snowplow/stream-collector/issues/169 + val azureAuth = "com.microsoft.azure" % "azure-client-authentication" % V.azureAuth //common unit tests val specs2 = "org.specs2" %% "specs2-core" % V.specs2 % Test