Skip to content
This repository has been archived by the owner on May 9, 2023. It is now read-only.

Commit

Permalink
avoid throwing catching invalid group id exception (#87)
Browse files Browse the repository at this point in the history
  • Loading branch information
strokyl authored Sep 21, 2022
1 parent f020333 commit 74e3598
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 7 deletions.
1 change: 1 addition & 0 deletions src/main/scala/zio/kafka/consumer/Consumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,7 @@ object Consumer {
for {
wrapper <- ConsumerAccess.fromJavaConsumer(javaConsumer, settings.closeTimeout)
runloop <- Runloop(
settings.hasGroupId,
wrapper,
settings.pollInterval,
settings.pollTimeout,
Expand Down
3 changes: 3 additions & 0 deletions src/main/scala/zio/kafka/consumer/ConsumerSettings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ final case class ConsumerSettings(
def withGroupId(groupId: String): ConsumerSettings =
withProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId)

private[consumer] def hasGroupId: Boolean =
properties.contains(ConsumerConfig.GROUP_ID_CONFIG)

def withGroupInstanceId(groupInstanceId: String): ConsumerSettings =
withProperty(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, groupInstanceId)

Expand Down
23 changes: 16 additions & 7 deletions src/main/scala/zio/kafka/consumer/internal/Runloop.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package zio.kafka.consumer.internal

import java.util
import org.apache.kafka.clients.consumer._
import org.apache.kafka.common.TopicPartition
import zio._
Expand All @@ -9,17 +8,17 @@ import zio.clock.Clock
import zio.duration._
import zio.kafka.consumer.Consumer.OffsetRetrieval
import zio.kafka.consumer.diagnostics.{ DiagnosticEvent, Diagnostics }
import zio.kafka.consumer.CommittableRecord
import zio.kafka.consumer.internal.ConsumerAccess.ByteArrayKafkaConsumer
import zio.kafka.consumer.internal.Runloop.{ ByteArrayCommittableRecord, ByteArrayConsumerRecord, Command }
import zio.kafka.consumer.RebalanceListener
import zio.kafka.consumer.{ CommittableRecord, RebalanceListener }
import zio.stream._

import java.util
import scala.collection.mutable
import scala.jdk.CollectionConverters._
import scala.util.Try

private[consumer] final class Runloop(
hasGroupId: Boolean,
consumer: ConsumerAccess,
pollFrequency: Duration,
pollTimeout: Duration,
Expand Down Expand Up @@ -189,9 +188,17 @@ private[consumer] final class Runloop(
reqRecs.toArray[ByteArrayConsumerRecord](Array.ofDim[ByteArrayConsumerRecord](reqRecs.size))
)

fulfillAction = fulfillAction *> req.cont.succeed(
concatenatedChunk.map(CommittableRecord(_, commit(_), Try(consumer.consumer.groupMetadata()).toOption))
)
fulfillAction = fulfillAction *> req.cont.succeed(concatenatedChunk.map { record =>
CommittableRecord(
record = record,
commitHandle = commit,
consumerGroupMetadata =
if (hasGroupId)
try Some(consumer.consumer.groupMetadata())
catch { case _: Throwable => None }
else None
)
})
buf -= req.tp
}
}
Expand Down Expand Up @@ -444,6 +451,7 @@ private[consumer] object Runloop {
}

def apply(
hasGroupId: Boolean,
consumer: ConsumerAccess,
pollFrequency: Duration,
pollTimeout: Duration,
Expand Down Expand Up @@ -473,6 +481,7 @@ private[consumer] object Runloop {
shutdownRef <- Ref.make(false).toManaged_
subscribedRef <- Ref.make(false).toManaged_
runloop = new Runloop(
hasGroupId,
consumer,
pollFrequency,
pollTimeout,
Expand Down

0 comments on commit 74e3598

Please sign in to comment.