Skip to content
This repository has been archived by the owner on May 9, 2023. It is now read-only.

Prefer org.apache.kafka.clients.admin.Admin over org.apache.kafka.clients.admin.AdminClient as recommended in AdminClient documentation #69

Merged
merged 2 commits into from
Aug 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ import scala.sys.process._

import scala.util.Try

lazy val scala212 = "2.12.15"
lazy val scala212 = "2.12.16"
lazy val scala213 = "2.13.8"
lazy val scala3 = "3.1.2"
lazy val scala3 = "3.1.3"
lazy val mainScala = scala213
lazy val allScala = Seq(scala212, scala3, mainScala)

Expand Down Expand Up @@ -95,7 +95,7 @@ lazy val kafka =
"io.conduktor.kafka" % "kafka-clients" % kafkaClientsVersion,
"com.fasterxml.jackson.core" % "jackson-databind" % "2.13.3",
"ch.qos.logback" % "logback-classic" % "1.2.11" % Test,
"org.scala-lang.modules" %% "scala-collection-compat" % "2.7.0"
"org.scala-lang.modules" %% "scala-collection-compat" % "2.8.1"
) ++ {
if (scalaBinaryVersion.value == "3")
Seq(
Expand Down
2 changes: 1 addition & 1 deletion project/build.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
sbt.version=1.6.2
sbt.version=1.7.1
12 changes: 6 additions & 6 deletions src/main/scala/zio/kafka/admin/AdminClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package zio.kafka.admin

import org.apache.kafka.clients.admin.ListOffsetsResult.{ ListOffsetsResultInfo => JListOffsetsResultInfo }
import org.apache.kafka.clients.admin.{
AdminClient => JAdminClient,
Admin => JAdmin,
AlterConsumerGroupOffsetsOptions => JAlterConsumerGroupOffsetsOptions,
Config => JConfig,
ConsumerGroupDescription => JConsumerGroupDescription,
Expand Down Expand Up @@ -228,7 +228,7 @@ object AdminClient extends Accessible[AdminClient] {
* @param adminClient
*/
private final class LiveAdminClient(
private val adminClient: JAdminClient,
private val adminClient: JAdmin,
private val blocking: Blocking.Service
) extends AdminClient {

Expand Down Expand Up @@ -1036,20 +1036,20 @@ object AdminClient extends Accessible[AdminClient] {
def make(settings: AdminClientSettings): ZManaged[Blocking, Throwable, AdminClient] =
fromManagedJavaClient(javaClientFromSettings(settings))

def fromJavaClient(javaClient: JAdminClient): URIO[Blocking, AdminClient] =
def fromJavaClient(javaClient: JAdmin): URIO[Blocking, AdminClient] =
ZIO.service[Blocking.Service].map { blocking =>
new LiveAdminClient(javaClient, blocking)
}

def fromManagedJavaClient[R, E](
managedJavaClient: ZManaged[R, E, JAdminClient]
managedJavaClient: ZManaged[R, E, JAdmin]
): ZManaged[Blocking & R, E, AdminClient] =
managedJavaClient.flatMap { javaClient =>
ZManaged.fromEffect(fromJavaClient(javaClient))
}

def javaClientFromSettings(settings: AdminClientSettings): ZManaged[Any, Throwable, JAdminClient] =
ZManaged.makeEffect(JAdminClient.create(settings.driverSettings.asJava))(_.close(settings.closeTimeout))
def javaClientFromSettings(settings: AdminClientSettings): ZManaged[Any, Throwable, JAdmin] =
ZManaged.makeEffect(JAdmin.create(settings.driverSettings.asJava))(_.close(settings.closeTimeout))

implicit class MapOps[K1, V1](val v: Map[K1, V1]) extends AnyVal {
def bimap[K2, V2](fk: K1 => K2, fv: V1 => V2) = v.map(kv => fk(kv._1) -> fv(kv._2))
Expand Down