Skip to content
This repository has been archived by the owner on May 9, 2023. It is now read-only.

Commit

Permalink
Add describeLogDirsAsync to AdminClient (#65)
Browse files Browse the repository at this point in the history
  • Loading branch information
azhur authored Jul 26, 2022
1 parent 67dcc6a commit ff9b607
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 0 deletions.
29 changes: 29 additions & 0 deletions src/main/scala/zio/kafka/admin/AdminClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,13 @@ trait AdminClient {
def describeLogDirs(
brokersId: Iterable[Int]
): ZIO[Any, Throwable, Map[Int, Map[String, LogDirDescription]]]

/**
* Describe the log directories of the specified brokers async
*/
def describeLogDirsAsync(
brokersId: Iterable[Int]
): ZIO[Any, Throwable, Map[Int, Task[Map[String, LogDirDescription]]]]
}

object AdminClient extends Accessible[AdminClient] {
Expand Down Expand Up @@ -534,6 +541,28 @@ object AdminClient extends Accessible[AdminClient] {
).map(
_.asScala.toMap.bimap(_.intValue, _.asScala.toMap.bimap(identity, LogDirDescription(_)))
)

/**
* Describe the log directories of the specified brokers async
*/
override def describeLogDirsAsync(
brokersId: Iterable[Int]
): ZIO[Any, Throwable, Map[Int, Task[Map[String, LogDirDescription]]]] =
blocking
.effectBlocking(
adminClient.describeLogDirs(brokersId.map(Int.box).asJavaCollection).descriptions()
)
.map(
_.asScala.view.map { case (brokerId, descriptionsFuture) =>
(
brokerId.intValue(),
ZIO
.fromCompletionStage(descriptionsFuture.toCompletionStage)
.map(_.asScala.toMap.map { case (k, v) => (k, LogDirDescription(v)) })
)

}.toMap
)
}

val live: ZLayer[Has[Blocking.Service] with Has[AdminClientSettings], Throwable, Has[AdminClient]] =
Expand Down
20 changes: 20 additions & 0 deletions src/test/scala/zio/kafka/AdminSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,26 @@ object AdminSpec extends DefaultRunnableSpec {
)
}
},
testM("describe log dirs async") {
KafkaTestUtils.withAdmin { implicit admin =>
for {
topicName <- randomTopic
_ <- admin.createTopic(AdminClient.NewTopic(topicName, numPartitions = 1, replicationFactor = 1))
node <- admin.describeClusterNodes().head.orElseFail(new NoSuchElementException())
logDirs <-
admin.describeLogDirsAsync(List(node.id)).flatMap { descriptions =>
ZIO.foreachPar(descriptions) { case (brokerId, descriptionAsync) =>
descriptionAsync.map(description => (brokerId, description))
}
}
} yield assert(logDirs)(
hasKey(
node.id,
hasValues(exists(hasField("replicaInfos", _.replicaInfos, hasKey(TopicPartition(topicName, 0)))))
)
)
}
},
test("should correctly handle no node (null) when converting JNode to Node") {
assert(AdminClient.Node.apply(null))(isNone)
},
Expand Down

0 comments on commit ff9b607

Please sign in to comment.