Skip to content
This repository has been archived by the owner on May 9, 2023. It is now read-only.

Commit

Permalink
Return offsets of all consumer groups in AdminClient::listConsumerGro…
Browse files Browse the repository at this point in the history
…upOffsets (#131)

Co-authored-by: Jules Ivanic <guizmaii@users.noreply.github.com>
  • Loading branch information
gbecan and guizmaii authored Jan 19, 2023
1 parent 51b8d7b commit 5b14cca
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 13 deletions.
12 changes: 7 additions & 5 deletions zio-kafka-test/src/test/scala/zio/kafka/AdminSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -333,10 +333,12 @@ object AdminSpec extends ZIOKafkaSpec {
client.listConsumerGroupOffsets(
Map(invalidGroupId -> ListConsumerGroupOffsetsSpec(Chunk.single(TopicPartition(topic, 0))))
)
} yield assert(offsets.get(TopicPartition(topic, 0)).map(_.offset))(isSome(equalTo(msgConsume.toLong))) &&
assert(invalidTopicOffsets)(isEmpty) &&
assert(invalidTpOffsets)(isEmpty) &&
assert(invalidGroupIdOffsets)(isEmpty)
} yield assert(offsets.get(groupId).flatMap(_.get(TopicPartition(topic, 0))).map(_.offset))(
isSome(equalTo(msgConsume.toLong))
) &&
assert(invalidTopicOffsets.get(groupId))(isSome(isEmpty)) &&
assert(invalidTpOffsets.get(groupId))(isSome(isEmpty)) &&
assert(invalidGroupIdOffsets.get(invalidGroupId))(isSome(isEmpty))
}
},
test("delete consumer group offsets") {
Expand All @@ -363,7 +365,7 @@ object AdminSpec extends ZIOKafkaSpec {
offsets <- client.listConsumerGroupOffsets(
Map(groupId -> ListConsumerGroupOffsetsSpec(Chunk.single(TopicPartition(topic, 0))))
)
} yield assert(offsets.get(TopicPartition(topic, 0)).map(_.offset))(isNone)
} yield assert(offsets.get(groupId).flatMap(_.get(TopicPartition(topic, 0))).map(_.offset))(isNone)
}
},
test("describe consumer groups") {
Expand Down
26 changes: 18 additions & 8 deletions zio-kafka/src/main/scala/zio/kafka/admin/AdminClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -192,15 +192,15 @@ trait AdminClient {
*/
def listConsumerGroupOffsets(
groupSpecs: Map[String, ListConsumerGroupOffsetsSpec]
): Task[Map[TopicPartition, OffsetAndMetadata]]
): Task[Map[String, Map[TopicPartition, OffsetAndMetadata]]]

/**
* List the consumer group offsets available in the cluster for the specified consumer groups.
*/
def listConsumerGroupOffsets(
groupSpecs: Map[String, ListConsumerGroupOffsetsSpec],
options: ListConsumerGroupOffsetsOptions
): Task[Map[TopicPartition, OffsetAndMetadata]]
): Task[Map[String, Map[TopicPartition, OffsetAndMetadata]]]

/**
* Alter offsets for the specified partitions and consumer group.
Expand Down Expand Up @@ -628,33 +628,43 @@ object AdminClient {
*/
override def listConsumerGroupOffsets(
groupSpecs: Map[String, ListConsumerGroupOffsetsSpec]
): Task[Map[TopicPartition, OffsetAndMetadata]] =
): Task[Map[String, Map[TopicPartition, OffsetAndMetadata]]] =
fromKafkaFuture {
ZIO.attemptBlocking(
adminClient
.listConsumerGroupOffsets(groupSpecs.map { case (groupId, offsetsSpec) =>
(groupId, offsetsSpec.asJava)
}.asJava)
.partitionsToOffsetAndMetadata()
.all()
)
}
.map(_.asScala.filter { case (_, om) => om ne null }.toMap.bimap(TopicPartition(_), OffsetAndMetadata(_)))
.map(
_.asScala.map { case (groupId, offsets) =>
groupId -> offsets.asScala.filter { case (_, om) => om ne null }.toMap
.bimap(TopicPartition(_), OffsetAndMetadata(_))
}.toMap
)

override def listConsumerGroupOffsets(
groupSpecs: Map[String, ListConsumerGroupOffsetsSpec],
options: ListConsumerGroupOffsetsOptions
): Task[Map[TopicPartition, OffsetAndMetadata]] =
): Task[Map[String, Map[TopicPartition, OffsetAndMetadata]]] =
fromKafkaFuture {
ZIO.attemptBlocking(
adminClient
.listConsumerGroupOffsets(
groupSpecs.map { case (groupId, offsetsSpec) => (groupId, offsetsSpec.asJava) }.asJava,
options.asJava
)
.partitionsToOffsetAndMetadata()
.all()
)
}
.map(_.asScala.filter { case (_, om) => om ne null }.toMap.bimap(TopicPartition(_), OffsetAndMetadata(_)))
.map(
_.asScala.map { case (groupId, offsets) =>
groupId -> offsets.asScala.filter { case (_, om) => om ne null }.toMap
.bimap(TopicPartition(_), OffsetAndMetadata(_))
}.toMap
)

/**
* Alter offsets for the specified partitions and consumer group.
Expand Down

0 comments on commit 5b14cca

Please sign in to comment.