Fs2 Rabbit Client

RabbitClient is the main client that wraps the communication with RabbitMQ. The mandatory arguments are a Fs2RabbitConfig and a cats.effect.Blocker used for publishing (this action is blocking in the underlying Java client). Optionally, you can pass in a custom SSLContext and SaslConfig.

import cats.effect._
import com.rabbitmq.client.{DefaultSaslConfig, SaslConfig}
import dev.profunktor.fs2rabbit.config.Fs2RabbitConfig
import dev.profunktor.fs2rabbit.interpreter.RabbitClient
import javax.net.ssl.SSLContext

object RabbitClient {
  def apply[F[_]: ConcurrentEffect: ContextShift](
    config: Fs2RabbitConfig,
    blocker: Blocker,
    sslContext: Option[SSLContext] = None,
    saslConfig: SaslConfig = DefaultSaslConfig.PLAIN
  ): F[RabbitClient[F]] = ???
}

Its creation is effectful so you need to flatMap and pass it as an argument. For example:

import cats.effect._
import cats.syntax.functor._
import dev.profunktor.fs2rabbit.interpreter.RabbitClient
import java.util.concurrent.Executors

object Program {
  def foo[F[_]](client: RabbitClient[F]): F[Unit] = ???
}

class Demo extends IOApp {

  val config: Fs2RabbitConfig = Fs2RabbitConfig(
    virtualHost = "/",
    host = "127.0.0.1",
    username = Some("guest"),
    password = Some("guest"),
    port = 5672,
    ssl = false,
    connectionTimeout = 3,
    requeueOnNack = false,
    internalQueueSize = Some(500)
  )

  val blockerResource =
    Resource
      .make(IO(Executors.newCachedThreadPool()))(es => IO(es.shutdown()))
      .map(Blocker.liftExecutorService)

  override def run(args: List[String]): IO[ExitCode] =
    blockerResource.use { blocker =>
      RabbitClient[IO](config, blocker).flatMap { client =>
        Program.foo[IO](client).as(ExitCode.Success)
      }
    }

}