Publisher with Listener

It is possible to add a listener when creating a publisher to handle messages that cannot be routed.

The AMQP protocol defines two different bits that can be set when publishing a message: mandatory and immediate. You can read more about it in the AMQP reference. However, RabbitMQ only supports the mandatory bit in version 3.x so we don’t support the immediate bit either.

Bit Mandatory

This flag tells the server how to react if the message cannot be routed to a queue. If this flag is set, the server will return an unroutable message with a Return method. If this flag is zero, the server silently drops the message.

The server SHOULD implement the mandatory flag.

Creating a Publisher with Listener

It is simply created by specifying ExchangeName, RoutingKey, PublishingFlag and a listener, i.e. a function from PublishReturn to F[Unit]:

import cats.effect._
import dev.profunktor.fs2rabbit.model._
import dev.profunktor.fs2rabbit.interpreter.RabbitClient

val exchangeName = ExchangeName("testEX")
val routingKey   = RoutingKey("testRK")

val publishingFlag: PublishingFlag = PublishingFlag(mandatory = true)

val publishingListener: PublishReturn => IO[Unit] = pr => IO(println(s"Publish listener: $pr"))

def doSomething(publisher: String => IO[Unit]): IO[Unit] = IO.unit

def program(R: RabbitClient[IO]) =
  R.createConnectionChannel.use { implicit channel =>
    R.createPublisherWithListener[String](exchangeName, routingKey, publishingFlag, publishingListener).flatMap(doSomething)
  }

Publishing a simple message

Once you have a Publisher you can start publishing messages by calling it:

import cats.effect.Sync
import dev.profunktor.fs2rabbit.model._

def publishSimpleMessage[F[_]: Sync](publisher: String => F[Unit]): F[Unit] = {
  val message = "Hello world!"
  publisher(message)
}

NOTE: If the mandatory flag is set to true and there’s no queue bound to the target exchange the message will return to the assigned publishing listener.