Multiple Connections
This advanced case presents the challenge of having multiple (3) RabbitMQ Connections
interacting with each other.
We start by defining three different programs representing each connection, namely p1
, p2
and p3
defines a singleConsumer
1, namelyc1
defines a singleConsumer
2, namelyc2
defines a singlePublisher
We will be consuming messages from c1
and c2
, and publishing the result to p3
concurrently. Thanks to fs2
this becomes such a simple case:
import cats.effect._
import cats.implicits._
import dev.profunktor.fs2rabbit.config.declaration.DeclarationQueueConfig
import dev.profunktor.fs2rabbit.interpreter.RabbitClient
import dev.profunktor.fs2rabbit.model._
import fs2._
import scala.concurrent.ExecutionContext
implicit val cs = IO.contextShift(
val q1 = QueueName("q1")
val ex = ExchangeName("testEX")
val rk = RoutingKey("RKA")
Here’s our program p1
creating a Consumer
representing the first Connection
def p1(R: RabbitClient[IO]) =
R.createConnectionChannel.use { implicit channel =>
R.declareExchange(ex, ExchangeType.Topic) *>
R.declareQueue(DeclarationQueueConfig.default(q1)) *>
R.bindQueue(q1, ex, rk) *>
Here’s our program p2
creating a Consumer
representing the second Connection
def p2(R: RabbitClient[IO]) =
R.createConnectionChannel use { implicit channel =>
R.declareExchange(ex, ExchangeType.Topic) *>
R.declareQueue(DeclarationQueueConfig.default(q1)) *>
R.bindQueue(q1, ex, rk) *>
Here’s our program p3
creating a Publisher
representing the third Connection
def p3(R: RabbitClient[IO]) =
R.createConnectionChannel use { implicit channel =>
R.declareExchange(ex, ExchangeType.Topic) *>
R.createPublisher(ex, rk)
And finally we compose all the three programs together:
val pipe: Pipe[IO, AmqpEnvelope[String], String] =
def program(c: RabbitClient[IO]) =
(p1(c), p2(c), p3(c)).mapN { case (c1, c2, pb) =>