AckerConsumer
An AckerConsumer
delegates the responsibility to acknowledge messages to the user. You are in total control of telling RabbitMQ
when and if a message should be marked as consumed. Use this if you can’t lose any messages.
import cats.effect.IO
import dev.profunktor.fs2rabbit.model._
import dev.profunktor.fs2rabbit.interpreter.RabbitClient
import fs2.Stream
val queueName = QueueName("daQ")
def doSomething(consumer: Stream[IO, AmqpEnvelope[String]], acker: AckResult => IO[Unit]): IO[Unit] = IO.unit
def program(R: RabbitClient[IO]) =
R.createConnectionChannel.use { implicit channel =>
R.createAckerConsumer[String](queueName).flatMap { case (acker, consumer) =>
doSomething(consumer, acker)
}
}
When creating a consumer, you can tune the configuration by using BasicQos
and ConsumerArgs
. By default, the basic QOS
is set to a prefetch size of 0, a prefetch count of 1 and global
is set to false. ConsumerArgs
is by None
by default since it’s optional. When defined, you can indicate consumerTag
(default is “”), noLocal
(default is false), exclusive
(default is false) and args
(default is an empty Map[String, ?]
).