Single AutoAckConsumer
Here we create a single AutoAckConsumer
, a single Publisher
and finally we publish two messages: a simple String
message and a Json
message by using the fs2-rabbit-circe
extension.
import java.nio.charset.StandardCharsets.UTF_8
import cats.data.Kleisli
import cats.effect._
import cats.implicits._
import dev.profunktor.fs2rabbit.config.declaration.DeclarationQueueConfig
import dev.profunktor.fs2rabbit.interpreter.RabbitClient
import dev.profunktor.fs2rabbit.json.Fs2JsonEncoder
import dev.profunktor.fs2rabbit.model.AckResult.Ack
import dev.profunktor.fs2rabbit.model.AmqpFieldValue.{LongVal, StringVal}
import dev.profunktor.fs2rabbit.model._
import fs2._
class AutoAckFlow[F[_]: Concurrent, A](
consumer: Stream[F, AmqpEnvelope[A]],
logger: Pipe[F, AmqpEnvelope[A], AckResult],
publisher: AmqpMessage[String] => F[Unit]
) {
import io.circe.generic.auto._
case class Address(number: Int, streetName: String)
case class Person(id: Long, name: String, address: Address)
private val jsonEncoder = new Fs2JsonEncoder
import jsonEncoder.jsonEncode
val jsonPipe: Pipe[Pure, AmqpMessage[Person], AmqpMessage[String]] = _.map(jsonEncode[Person])
val simpleMessage =
AmqpMessage("Hey!", AmqpProperties(headers = Map("demoId" -> LongVal(123), "app" -> StringVal("fs2RabbitDemo"))))
val classMessage = AmqpMessage(Person(1L, "Sherlock", Address(212, "Baker St")), AmqpProperties.empty)
val flow: Stream[F, Unit] =
Stream(
Stream(simpleMessage).covary[F] evalMap publisher,
Stream(classMessage).covary[F] through jsonPipe evalMap publisher,
consumer.through(logger).evalMap(ack => Sync[F].delay(println(ack)))
).parJoin(3)
}
class AutoAckConsumerDemo[F[_]: Concurrent](R: RabbitClient[F]) {
private val queueName = QueueName("testQ")
private val exchangeName = ExchangeName("testEX")
private val routingKey = RoutingKey("testRK")
implicit val stringMessageEncoder =
Kleisli[F, AmqpMessage[String], AmqpMessage[Array[Byte]]](s => s.copy(payload = s.payload.getBytes(UTF_8)).pure[F])
def logPipe: Pipe[F, AmqpEnvelope[String], AckResult] = _.evalMap { amqpMsg =>
Sync[F].delay(println(s"Consumed: $amqpMsg")).as(Ack(amqpMsg.deliveryTag))
}
val program: F[Unit] = R.createConnectionChannel.use { implicit channel =>
for {
_ <- R.declareQueue(DeclarationQueueConfig.default(queueName))
_ <- R.declareExchange(exchangeName, ExchangeType.Topic)
_ <- R.bindQueue(queueName, exchangeName, routingKey)
publisher <- R.createPublisher[AmqpMessage[String]](exchangeName, routingKey)
consumer <- R.createAutoAckConsumer[String](queueName)
_ <- new AutoAckFlow[F, String](consumer, logPipe, publisher).flow.compile.drain
} yield ()
}
}
At the edge of out program we define our effect, monix.eval.Task
in this case, and ask to evaluate the effects:
import cats.data.NonEmptyList
import dev.profunktor.fs2rabbit.config.{Fs2RabbitConfig, Fs2RabbitNodeConfig}
import dev.profunktor.fs2rabbit.interpreter.RabbitClient
import dev.profunktor.fs2rabbit.resiliency.ResilientStream
import monix.eval.{Task, TaskApp}
import java.util.concurrent.Executors
object MonixAutoAckConsumer extends TaskApp {
private val config: Fs2RabbitConfig = Fs2RabbitConfig(
virtualHost = "/",
nodes = NonEmptyList.one(
Fs2RabbitNodeConfig(
host = "127.0.0.1",
port = 5672
)
),
username = Some("guest"),
password = Some("guest"),
ssl = false,
connectionTimeout = 3,
requeueOnNack = false,
internalQueueSize = Some(500),
automaticRecovery = true
)
val blockerResource =
Resource
.make(Task(Executors.newCachedThreadPool()))(es => Task(es.shutdown()))
.map(Blocker.liftExecutorService)
override def run(args: List[String]): Task[ExitCode] =
blockerResource.use { blocker =>
RabbitClient[Task](config, blocker).flatMap { client =>
ResilientStream
.runF(new AutoAckConsumerDemo[Task](client).program)
.as(ExitCode.Success)
}
}
}