Consumers

There are two types of consumer: AutoAck and AckerConsumer. Each of them are parameterized on the effect type (eg. IO) and the data type they consume (the payload).

EnvelopeDecoder

When creating a consumer, either by using createAutoAckConsumer or createAckerConsumer, you’ll need an instance of EnvelopeDecoder[F, A] in scope. A default instance for Strings is provided by the library; it will use the contentEncoding of the message to determine the charset and fall back to UTF-8 if that is not present. If you wish to decode the envelope’s payload in a different format you’ll need to provide an implicit instance. Here’s the definition:

type EnvelopeDecoder[F[_], A] = Kleisli[F, AmqpEnvelope[Array[Byte]], A]

Kleisli[F, AmqpEnvelope[Array[Byte]], A] is a wrapper around a function AmqpEnvelope[Array[Byte]] => F[A]. You can for example write an EnvelopeDecoder for an array bytes thusly:

implicit def bytesDecoder[F[_]: Applicative]: EnvelopeDecoder[F, Array[Byte]] =
  Kleisli(_.payload.pure[F])

You can write all your EnvelopeDecoder instances this way, but it’s usually easier to make use of existing instances. Kleisli forms a Monad, so you can use all the usual combinators like map:

case class Foo(s: String)
implicit def fooDecoder[F[_]: ApplicativeError[?[_], Throwable]]: EnvelopeDecoder[F, Foo] =
  EnvelopeDecoder[F, String].map(Foo.apply)

Another useful combinator is flatMapF. For example a decoder for circe’s JSON type can be defined as follows:

import io.circe.parser._
import io.circe.Json
implicit def jsonDecoder[F[_]](implicit F: MonadError[F, Throwable]): EnvelopeDecoder[F, Json] =
  EnvelopeDecoder[F, String].flatMapF(s => F.fromEither(parse(s)))

For more details, please refer to the the Kleisli documentation.

The library comes with a number of EnvelopeDecoders predefined in object EnvelopeDecoder. These allow you to easily access both optional and non-optional header fields, the AmqpProperties object and the payload (as an Array[Byte]). Refer to the source code for details.

  • AutoAckConsumer: A consumer that acknowledges message consumption automatically.
  • AckerConsumer: A consumer that delegates the responsibility to acknowledge message consumption to the user.
  • Consuming Json: Consuming Json messages using the fs2-rabbit-circe module.