diff --git a/zio-kafka-test/src/test/scala/zio/kafka/AdminSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/AdminSpec.scala index 2e11ac7fd..db71f81ff 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/AdminSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/AdminSpec.scala @@ -333,10 +333,12 @@ object AdminSpec extends ZIOKafkaSpec { client.listConsumerGroupOffsets( Map(invalidGroupId -> ListConsumerGroupOffsetsSpec(Chunk.single(TopicPartition(topic, 0)))) ) - } yield assert(offsets.get(TopicPartition(topic, 0)).map(_.offset))(isSome(equalTo(msgConsume.toLong))) && - assert(invalidTopicOffsets)(isEmpty) && - assert(invalidTpOffsets)(isEmpty) && - assert(invalidGroupIdOffsets)(isEmpty) + } yield assert(offsets.get(groupId).flatMap(_.get(TopicPartition(topic, 0))).map(_.offset))( + isSome(equalTo(msgConsume.toLong)) + ) && + assert(invalidTopicOffsets.get(groupId))(isSome(isEmpty)) && + assert(invalidTpOffsets.get(groupId))(isSome(isEmpty)) && + assert(invalidGroupIdOffsets.get(invalidGroupId))(isSome(isEmpty)) } }, test("delete consumer group offsets") { @@ -363,7 +365,7 @@ object AdminSpec extends ZIOKafkaSpec { offsets <- client.listConsumerGroupOffsets( Map(groupId -> ListConsumerGroupOffsetsSpec(Chunk.single(TopicPartition(topic, 0)))) ) - } yield assert(offsets.get(TopicPartition(topic, 0)).map(_.offset))(isNone) + } yield assert(offsets.get(groupId).flatMap(_.get(TopicPartition(topic, 0))).map(_.offset))(isNone) } }, test("describe consumer groups") { diff --git a/zio-kafka/src/main/scala/zio/kafka/admin/AdminClient.scala b/zio-kafka/src/main/scala/zio/kafka/admin/AdminClient.scala index 1a59e119a..863a0e59f 100644 --- a/zio-kafka/src/main/scala/zio/kafka/admin/AdminClient.scala +++ b/zio-kafka/src/main/scala/zio/kafka/admin/AdminClient.scala @@ -192,7 +192,7 @@ trait AdminClient { */ def listConsumerGroupOffsets( groupSpecs: Map[String, ListConsumerGroupOffsetsSpec] - ): Task[Map[TopicPartition, OffsetAndMetadata]] + ): Task[Map[String, Map[TopicPartition, OffsetAndMetadata]]] /** * List the consumer group offsets available in the cluster for the specified consumer groups. @@ -200,7 +200,7 @@ trait AdminClient { def listConsumerGroupOffsets( groupSpecs: Map[String, ListConsumerGroupOffsetsSpec], options: ListConsumerGroupOffsetsOptions - ): Task[Map[TopicPartition, OffsetAndMetadata]] + ): Task[Map[String, Map[TopicPartition, OffsetAndMetadata]]] /** * Alter offsets for the specified partitions and consumer group. @@ -628,22 +628,27 @@ object AdminClient { */ override def listConsumerGroupOffsets( groupSpecs: Map[String, ListConsumerGroupOffsetsSpec] - ): Task[Map[TopicPartition, OffsetAndMetadata]] = + ): Task[Map[String, Map[TopicPartition, OffsetAndMetadata]]] = fromKafkaFuture { ZIO.attemptBlocking( adminClient .listConsumerGroupOffsets(groupSpecs.map { case (groupId, offsetsSpec) => (groupId, offsetsSpec.asJava) }.asJava) - .partitionsToOffsetAndMetadata() + .all() ) } - .map(_.asScala.filter { case (_, om) => om ne null }.toMap.bimap(TopicPartition(_), OffsetAndMetadata(_))) + .map( + _.asScala.map { case (groupId, offsets) => + groupId -> offsets.asScala.filter { case (_, om) => om ne null }.toMap + .bimap(TopicPartition(_), OffsetAndMetadata(_)) + }.toMap + ) override def listConsumerGroupOffsets( groupSpecs: Map[String, ListConsumerGroupOffsetsSpec], options: ListConsumerGroupOffsetsOptions - ): Task[Map[TopicPartition, OffsetAndMetadata]] = + ): Task[Map[String, Map[TopicPartition, OffsetAndMetadata]]] = fromKafkaFuture { ZIO.attemptBlocking( adminClient @@ -651,10 +656,15 @@ object AdminClient { groupSpecs.map { case (groupId, offsetsSpec) => (groupId, offsetsSpec.asJava) }.asJava, options.asJava ) - .partitionsToOffsetAndMetadata() + .all() ) } - .map(_.asScala.filter { case (_, om) => om ne null }.toMap.bimap(TopicPartition(_), OffsetAndMetadata(_))) + .map( + _.asScala.map { case (groupId, offsets) => + groupId -> offsets.asScala.filter { case (_, om) => om ne null }.toMap + .bimap(TopicPartition(_), OffsetAndMetadata(_)) + }.toMap + ) /** * Alter offsets for the specified partitions and consumer group.