Json message Consuming
A stream-based Json Decoder
that can be connected to a stream of AmqpEnvelope
is provided by the extra dependency fs2-rabbit-circe
. Implicit decoders for your classes must be on scope. You can use Circe
’s codec auto derivation for example:
import cats.effect.IO
import dev.profunktor.fs2rabbit.json.Fs2JsonDecoder
import dev.profunktor.fs2rabbit.model.AckResult._
import dev.profunktor.fs2rabbit.model._
import io.circe._
import io.circe.generic.auto._
import fs2._
case class Address(number: Int, streetName: String)
case class Person(id: Long, name: String, address: Address)
object ioDecoder extends Fs2JsonDecoder
def program(consumer: Stream[IO, AmqpEnvelope[String]], acker: AckResult => IO[Unit], errorSink: Pipe[IO, Error, Unit], processorSink: Pipe[IO, (Person, DeliveryTag), Unit]) = {
import ioDecoder._
consumer.map(jsonDecode[Person]).flatMap {
case (Left(error), tag) => (Stream.eval(IO(error)).through(errorSink)).as(NAck(tag)).evalMap(acker)
case (Right(msg), tag) => Stream.eval(IO((msg, tag))).through(processorSink)
}
}