Skip to content

Commit

Permalink
makes consumer tag optional
Browse files Browse the repository at this point in the history
  • Loading branch information
rlavolee committed Nov 25, 2023
1 parent 9e4908a commit 16180c1
Show file tree
Hide file tree
Showing 6 changed files with 24 additions and 14 deletions.
2 changes: 1 addition & 1 deletion modules/client/src/main/scala/Channel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ object Channel {
noAck: NoAck = true,
exclusive: Boolean = false,
arguments: FieldTable = FieldTable.empty,
ctag: ConsumerTag
ctag: Option[ConsumerTag]
): Stream[F, DeliveredMessageRaw] =
import Stream.*
resource(channel.delivered(ctag)).flatMap { case (ctag, data) =>
Expand Down
4 changes: 2 additions & 2 deletions modules/client/src/main/scala/MessagingAPI.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ trait Consuming[F[_]] {
noAck: NoAck = true,
exclusive: Boolean = false,
arguments: FieldTable = FieldTable.empty,
consumerTag: ConsumerTag = ConsumerTag.random
consumerTag: Option[ConsumerTag] = None
): Stream[F, DeliveredMessageRaw]

/** Consumes and decodes messages
Expand Down Expand Up @@ -74,7 +74,7 @@ trait Consuming[F[_]] {
noLocal: NoLocal = false,
exclusive: Boolean = false,
arguments: FieldTable = FieldTable.empty,
consumerTag: ConsumerTag = ConsumerTag.random
consumerTag: Option[ConsumerTag] = None
)(using
dec: MessageDecoder[T],
F: RaiseThrowable[F]
Expand Down
6 changes: 3 additions & 3 deletions modules/client/src/main/scala/internal/LowlevelChannel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ private[client] trait ChannelTransmitter[F[_]] {

def get(m: BasicClass.Get): F[Option[SynchronousGetRaw]]

def delivered(ctag: ConsumerTag): Resource[F, (ConsumerTag, Stream[F, DeliveredMessageRaw])]
def delivered(ctag: Option[ConsumerTag]): Resource[F, (ConsumerTag, Stream[F, DeliveredMessageRaw])]
final def delivered: Resource[F, (ConsumerTag, Stream[F, DeliveredMessageRaw])] =
delivered(ConsumerTag.random)
delivered(None)

def returned: Stream[F, ReturnedMessageRaw]
def confirmed: Stream[F, Confirmation]
Expand Down Expand Up @@ -154,7 +154,7 @@ private[client] object LowlevelChannel {
def get(m: BasicClass.Get): F[Option[SynchronousGetRaw]] =
content.get(m).flatMap(_.get)

def delivered(ctag: ConsumerTag): Resource[F, (ConsumerTag, Stream[F, DeliveredMessageRaw])] =
def delivered(ctag: Option[ConsumerTag]): Resource[F, (ConsumerTag, Stream[F, DeliveredMessageRaw])] =
disp.deliveryQ(ctag).map { case (ctag, q) =>
(ctag, Stream.fromQueueNoneTerminated(q).interruptWhen(isClosed))
}
Expand Down
20 changes: 15 additions & 5 deletions modules/client/src/main/scala/internal/MessageDispatcher.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package internal
import cats.effect.Concurrent
import cats.effect.kernel.Resource
import cats.effect.std.*
import cats.effect.syntax.all.*
import cats.syntax.all.*
import lepus.protocol.domains.*

Expand All @@ -29,11 +30,10 @@ private[client] trait MessageDispatcher[F[_]] {
def confirm(msg: ConfirmationResponse): F[Unit]
def cancel(ctag: ConsumerTag): F[Unit]

def deliveryQ(ctag: ConsumerTag)
def deliveryQ(ctag: Option[ConsumerTag])
: Resource[F, (ConsumerTag, QueueSource[F, Option[DeliveredMessageRaw]])]

final def deliveryQ: Resource[F, (ConsumerTag, QueueSource[F, Option[DeliveredMessageRaw]])] =
deliveryQ(ConsumerTag.random)
deliveryQ(None)

def returnQ: QueueSource[F, ReturnedMessageRaw]
def confirmationQ: QueueSource[F, ConfirmationResponse]
Expand All @@ -48,6 +48,7 @@ private[client] object MessageDispatcher {
dqs <- F.ref(Map.empty[ConsumerTag, Queue[F, Option[DeliveredMessageRaw]]])
rq <- Queue.bounded[F, ReturnedMessageRaw](returnedBufSize)
cq <- Queue.bounded[F, ConfirmationResponse](confirmBufSize)
counter <- F.ref(0)
} yield new {

override def cancel(ctag: ConsumerTag): F[Unit] =
Expand All @@ -56,6 +57,11 @@ private[client] object MessageDispatcher {
case None => F.unit
}

private def newCtag = counter
.getAndUpdate(_ + 1)
.map(i => ConsumerTag.from(s"consumer-$i").getOrElse(???))
.toResource

override def deliver(msg: DeliveredMessageRaw): F[Unit] =
dqs.get.map(_.get(msg.consumerTag)).flatMap {
case Some(q) => q.offer(Some(msg))
Expand All @@ -64,10 +70,14 @@ private[client] object MessageDispatcher {

override def `return`(msg: ReturnedMessageRaw): F[Unit] = rq.offer(msg)

override def deliveryQ(ctag: ConsumerTag): Resource[
override def deliveryQ(ctag: Option[ConsumerTag]): Resource[
F,
(ConsumerTag, QueueSource[F, Option[DeliveredMessageRaw]])
] = addQ(ctag).map((ctag, _))
] =
for {
ctag <- ctag.fold(newCtag)(Resource.pure)
q <- addQ(ctag)
} yield (ctag, q)

private def addQ(
ctag: ConsumerTag
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ final class FakeLowLevelChannel(

}

override def delivered(ctag: ConsumerTag)
override def delivered(ctag: Option[ConsumerTag])
: Resource[IO, (ConsumerTag, Stream[IO, DeliveredMessageRaw])] =
Resource.eval(channel.get).flatMap(_.delivered(ctag))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class MessageDispatcherSuite extends InternalTestSuite {
forAllF(deliveries) { someMsg =>
for {
d <- MessageDispatcher[IO]()
_ <- d.deliveryQ(someMsg.consumerTag).use { (ctag, q) =>
_ <- d.deliveryQ(Some(someMsg.consumerTag)).use { (ctag, q) =>
q.size.assertEquals(0) >>
d.deliver(someMsg) >>
q.size.assertEquals(1) >>
Expand Down Expand Up @@ -81,7 +81,7 @@ class MessageDispatcherSuite extends InternalTestSuite {
forAllF(deliveries) { someMsg =>
for {
d <- MessageDispatcher[IO]()
out <- d.deliveryQ(someMsg.consumerTag).use { (ctag, q) =>
out <- d.deliveryQ(Some(someMsg.consumerTag)).use { (ctag, q) =>
IO(someMsg, q)
}
(msg, q) = out
Expand Down

0 comments on commit 16180c1

Please sign in to comment.