Multiple Connections
This advanced case presents the challenge of having multiple (3) RabbitMQ Connections
interacting with each other.
We start by defining three different programs representing each connection, namely p1
, p2
and p3
respectively.
p1
defines a singleConsumer
forConnection
1, namelyc1
.p2
defines a singleConsumer
forConnection
2, namelyc2
.p3
defines a singlePublisher
forConnection
3.
We will be consuming messages from c1
and c2
, and publishing the result to p3
concurrently. Thanks to fs2
this becomes such a simple case:
import cats.effect._
import cats.implicits._
import dev.profunktor.fs2rabbit.config.declaration.DeclarationQueueConfig
import dev.profunktor.fs2rabbit.interpreter.RabbitClient
import dev.profunktor.fs2rabbit.model._
import fs2._
import scala.concurrent.ExecutionContext
implicit val cs = IO.contextShift(ExecutionContext.global)
val q1 = QueueName("q1")
val ex = ExchangeName("testEX")
val rk = RoutingKey("RKA")
Here’s our program p1
creating a Consumer
representing the first Connection
:
def p1(R: RabbitClient[IO]) =
R.createConnectionChannel.use { implicit channel =>
R.declareExchange(ex, ExchangeType.Topic) *>
R.declareQueue(DeclarationQueueConfig.default(q1)) *>
R.bindQueue(q1, ex, rk) *>
R.createAutoAckConsumer[String](q1)
}
Here’s our program p2
creating a Consumer
representing the second Connection
:
def p2(R: RabbitClient[IO]) =
R.createConnectionChannel use { implicit channel =>
R.declareExchange(ex, ExchangeType.Topic) *>
R.declareQueue(DeclarationQueueConfig.default(q1)) *>
R.bindQueue(q1, ex, rk) *>
R.createAutoAckConsumer[String](q1)
}
Here’s our program p3
creating a Publisher
representing the third Connection
:
def p3(R: RabbitClient[IO]) =
R.createConnectionChannel use { implicit channel =>
R.declareExchange(ex, ExchangeType.Topic) *>
R.createPublisher(ex, rk)
}
And finally we compose all the three programs together:
val pipe: Pipe[IO, AmqpEnvelope[String], String] = _.map(_.payload)
def program(c: RabbitClient[IO]) =
(p1(c), p2(c), p3(c)).mapN { case (c1, c2, pb) =>
(c1.through(pipe).evalMap(pb)).concurrently(c2.through(pipe).evalMap(pb)).compile.drain
}