From 16180c1583e90715acef2124037e4103d84e9810 Mon Sep 17 00:00:00 2001 From: rlavolee Date: Sat, 25 Nov 2023 21:23:24 +0100 Subject: [PATCH] makes consumer tag optional --- modules/client/src/main/scala/Channel.scala | 2 +- .../client/src/main/scala/MessagingAPI.scala | 4 ++-- .../main/scala/internal/LowlevelChannel.scala | 6 +++--- .../scala/internal/MessageDispatcher.scala | 20 ++++++++++++++----- .../scala/internal/FakeLowLevelChannel.scala | 2 +- .../internal/MessageDispatcherSuite.scala | 4 ++-- 6 files changed, 24 insertions(+), 14 deletions(-) diff --git a/modules/client/src/main/scala/Channel.scala b/modules/client/src/main/scala/Channel.scala index ccc681c..de609f6 100644 --- a/modules/client/src/main/scala/Channel.scala +++ b/modules/client/src/main/scala/Channel.scala @@ -84,7 +84,7 @@ object Channel { noAck: NoAck = true, exclusive: Boolean = false, arguments: FieldTable = FieldTable.empty, - ctag: ConsumerTag + ctag: Option[ConsumerTag] ): Stream[F, DeliveredMessageRaw] = import Stream.* resource(channel.delivered(ctag)).flatMap { case (ctag, data) => diff --git a/modules/client/src/main/scala/MessagingAPI.scala b/modules/client/src/main/scala/MessagingAPI.scala index bb7ee64..f37c5d0 100644 --- a/modules/client/src/main/scala/MessagingAPI.scala +++ b/modules/client/src/main/scala/MessagingAPI.scala @@ -44,7 +44,7 @@ trait Consuming[F[_]] { noAck: NoAck = true, exclusive: Boolean = false, arguments: FieldTable = FieldTable.empty, - consumerTag: ConsumerTag = ConsumerTag.random + consumerTag: Option[ConsumerTag] = None ): Stream[F, DeliveredMessageRaw] /** Consumes and decodes messages @@ -74,7 +74,7 @@ trait Consuming[F[_]] { noLocal: NoLocal = false, exclusive: Boolean = false, arguments: FieldTable = FieldTable.empty, - consumerTag: ConsumerTag = ConsumerTag.random + consumerTag: Option[ConsumerTag] = None )(using dec: MessageDecoder[T], F: RaiseThrowable[F] diff --git a/modules/client/src/main/scala/internal/LowlevelChannel.scala b/modules/client/src/main/scala/internal/LowlevelChannel.scala index 166b599..8af7fb6 100644 --- a/modules/client/src/main/scala/internal/LowlevelChannel.scala +++ b/modules/client/src/main/scala/internal/LowlevelChannel.scala @@ -51,9 +51,9 @@ private[client] trait ChannelTransmitter[F[_]] { def get(m: BasicClass.Get): F[Option[SynchronousGetRaw]] - def delivered(ctag: ConsumerTag): Resource[F, (ConsumerTag, Stream[F, DeliveredMessageRaw])] + def delivered(ctag: Option[ConsumerTag]): Resource[F, (ConsumerTag, Stream[F, DeliveredMessageRaw])] final def delivered: Resource[F, (ConsumerTag, Stream[F, DeliveredMessageRaw])] = - delivered(ConsumerTag.random) + delivered(None) def returned: Stream[F, ReturnedMessageRaw] def confirmed: Stream[F, Confirmation] @@ -154,7 +154,7 @@ private[client] object LowlevelChannel { def get(m: BasicClass.Get): F[Option[SynchronousGetRaw]] = content.get(m).flatMap(_.get) - def delivered(ctag: ConsumerTag): Resource[F, (ConsumerTag, Stream[F, DeliveredMessageRaw])] = + def delivered(ctag: Option[ConsumerTag]): Resource[F, (ConsumerTag, Stream[F, DeliveredMessageRaw])] = disp.deliveryQ(ctag).map { case (ctag, q) => (ctag, Stream.fromQueueNoneTerminated(q).interruptWhen(isClosed)) } diff --git a/modules/client/src/main/scala/internal/MessageDispatcher.scala b/modules/client/src/main/scala/internal/MessageDispatcher.scala index a4be3e0..d3316e7 100644 --- a/modules/client/src/main/scala/internal/MessageDispatcher.scala +++ b/modules/client/src/main/scala/internal/MessageDispatcher.scala @@ -20,6 +20,7 @@ package internal import cats.effect.Concurrent import cats.effect.kernel.Resource import cats.effect.std.* +import cats.effect.syntax.all.* import cats.syntax.all.* import lepus.protocol.domains.* @@ -29,11 +30,10 @@ private[client] trait MessageDispatcher[F[_]] { def confirm(msg: ConfirmationResponse): F[Unit] def cancel(ctag: ConsumerTag): F[Unit] - def deliveryQ(ctag: ConsumerTag) + def deliveryQ(ctag: Option[ConsumerTag]) : Resource[F, (ConsumerTag, QueueSource[F, Option[DeliveredMessageRaw]])] - final def deliveryQ: Resource[F, (ConsumerTag, QueueSource[F, Option[DeliveredMessageRaw]])] = - deliveryQ(ConsumerTag.random) + deliveryQ(None) def returnQ: QueueSource[F, ReturnedMessageRaw] def confirmationQ: QueueSource[F, ConfirmationResponse] @@ -48,6 +48,7 @@ private[client] object MessageDispatcher { dqs <- F.ref(Map.empty[ConsumerTag, Queue[F, Option[DeliveredMessageRaw]]]) rq <- Queue.bounded[F, ReturnedMessageRaw](returnedBufSize) cq <- Queue.bounded[F, ConfirmationResponse](confirmBufSize) + counter <- F.ref(0) } yield new { override def cancel(ctag: ConsumerTag): F[Unit] = @@ -56,6 +57,11 @@ private[client] object MessageDispatcher { case None => F.unit } + private def newCtag = counter + .getAndUpdate(_ + 1) + .map(i => ConsumerTag.from(s"consumer-$i").getOrElse(???)) + .toResource + override def deliver(msg: DeliveredMessageRaw): F[Unit] = dqs.get.map(_.get(msg.consumerTag)).flatMap { case Some(q) => q.offer(Some(msg)) @@ -64,10 +70,14 @@ private[client] object MessageDispatcher { override def `return`(msg: ReturnedMessageRaw): F[Unit] = rq.offer(msg) - override def deliveryQ(ctag: ConsumerTag): Resource[ + override def deliveryQ(ctag: Option[ConsumerTag]): Resource[ F, (ConsumerTag, QueueSource[F, Option[DeliveredMessageRaw]]) - ] = addQ(ctag).map((ctag, _)) + ] = + for { + ctag <- ctag.fold(newCtag)(Resource.pure) + q <- addQ(ctag) + } yield (ctag, q) private def addQ( ctag: ConsumerTag diff --git a/modules/client/src/test/scala/internal/FakeLowLevelChannel.scala b/modules/client/src/test/scala/internal/FakeLowLevelChannel.scala index d2ee645..bb5cf44 100644 --- a/modules/client/src/test/scala/internal/FakeLowLevelChannel.scala +++ b/modules/client/src/test/scala/internal/FakeLowLevelChannel.scala @@ -54,7 +54,7 @@ final class FakeLowLevelChannel( } - override def delivered(ctag: ConsumerTag) + override def delivered(ctag: Option[ConsumerTag]) : Resource[IO, (ConsumerTag, Stream[IO, DeliveredMessageRaw])] = Resource.eval(channel.get).flatMap(_.delivered(ctag)) diff --git a/modules/client/src/test/scala/internal/MessageDispatcherSuite.scala b/modules/client/src/test/scala/internal/MessageDispatcherSuite.scala index 3f018ba..9b90045 100644 --- a/modules/client/src/test/scala/internal/MessageDispatcherSuite.scala +++ b/modules/client/src/test/scala/internal/MessageDispatcherSuite.scala @@ -53,7 +53,7 @@ class MessageDispatcherSuite extends InternalTestSuite { forAllF(deliveries) { someMsg => for { d <- MessageDispatcher[IO]() - _ <- d.deliveryQ(someMsg.consumerTag).use { (ctag, q) => + _ <- d.deliveryQ(Some(someMsg.consumerTag)).use { (ctag, q) => q.size.assertEquals(0) >> d.deliver(someMsg) >> q.size.assertEquals(1) >> @@ -81,7 +81,7 @@ class MessageDispatcherSuite extends InternalTestSuite { forAllF(deliveries) { someMsg => for { d <- MessageDispatcher[IO]() - out <- d.deliveryQ(someMsg.consumerTag).use { (ctag, q) => + out <- d.deliveryQ(Some(someMsg.consumerTag)).use { (ctag, q) => IO(someMsg, q) } (msg, q) = out