Client Metrics
RabbitMQ Java Client supports metrics collection via Dropwizard or Micrometer.
At the moment of writing both providers are in the amqp-client
5.9.0. You can instantiate one as shown below.
val registry = new MetricRegistry
val dropwizardCollector = new StandardMetricsCollector(registry)
Now it is ready to use.
RabbitClient.default[IO](config).withMetricsCollector(dropwizardCollector).resource
Expose via JMX
JMX provides a standard way to access performance metrics of an application. Dropwizard has a module to report metrics
via JMX with metrics-jmx
module. Please add it to the list of the dependencies.
libraryDependencies += "io.dropwizard.metrics" % "metrics-core" % "4.1.5"
libraryDependencies += "io.dropwizard.metrics" % "metrics-jmx" % "4.1.5"
It provides JmxReporter
for the metrics registry. It is a resource. It can be wrapped with acquire-release pattern for
ease to use.
object JmxReporterResource {
def make[F[_]: Sync](registry: MetricRegistry): Resource[F, JmxReporter] = {
val acquire = Sync[F].delay {
val reporter = JmxReporter.forRegistry(registry).inDomain("com.rabbitmq.client.jmx").build
reporter.start()
reporter
}
val close = (reporter: JmxReporter) => Sync[F].delay(reporter.close()).void
Resource.make(acquire)(close)
}
}
Let’s initialise the FS2 RabbitMQ client and AMQP channel with metrics.
val resources = for {
_ <- JmxReporterResource.make[IO](registry)
client <- RabbitClient.default[IO](config).withMetricsCollector(dropwizardCollector).resource
channel <- client.createConnection.flatMap(client.createChannel)
} yield (channel, client)
val program = resources.use {
case (channel, client) =>
// Let's publish and consume and see the counters go up
}
The app is going to have now the following metrics under com.rabbitmq.client.jmx
:
- Acknowledged messages
- Channels count
- Connections count
- Consumed messages
- Published messages
- Rejected messages
Full Listing
Let’s create an application that publishes and consumes messages with exposed JMX metrics on top of the Cats Effect.
import java.nio.charset.StandardCharsets.UTF_8
import cats.data.{Kleisli, NonEmptyList}
import cats.effect.{ExitCode, IO, IOApp, Resource, Sync}
import cats.implicits._
import com.codahale.metrics.MetricRegistry
import com.codahale.metrics.jmx.JmxReporter
import com.rabbitmq.client.impl.StandardMetricsCollector
import dev.profunktor.fs2rabbit.config.declaration.{DeclarationExchangeConfig, DeclarationQueueConfig}
import dev.profunktor.fs2rabbit.config.{Fs2RabbitConfig, Fs2RabbitNodeConfig}
import dev.profunktor.fs2rabbit.interpreter.RabbitClient
import dev.profunktor.fs2rabbit.model.AckResult.Ack
import dev.profunktor.fs2rabbit.model.ExchangeType.Topic
import dev.profunktor.fs2rabbit.model._
import dev.profunktor.fs2rabbit.examples.putStrLn
import fs2._
import scala.concurrent.duration._
object DropwizardMetricsDemo extends IOApp {
private val config: Fs2RabbitConfig = Fs2RabbitConfig(
virtualHost = "/",
nodes = NonEmptyList.one(Fs2RabbitNodeConfig(host = "127.0.0.1", port = 5672)),
username = Some("guest"),
password = Some("guest"),
ssl = false,
connectionTimeout = 3.seconds,
requeueOnNack = false,
requeueOnReject = false,
internalQueueSize = Some(500),
requestedHeartbeat = 60.seconds,
automaticRecovery = true,
automaticTopologyRecovery = true,
clientProvidedConnectionName = Some("app:rabbit")
)
private val queueName = QueueName("testQ")
private val exchangeName = ExchangeName("testEX")
private val routingKey = RoutingKey("testRK")
val simpleMessage = AmqpMessage("Hey!", AmqpProperties.empty)
implicit val stringMessageEncoder =
Kleisli[IO, AmqpMessage[String], AmqpMessage[Array[Byte]]] { s =>
s.copy(payload = s.payload.getBytes(UTF_8)).pure[IO]
}
override def run(args: List[String]): IO[ExitCode] = {
val registry = new MetricRegistry
val dropwizardCollector = new StandardMetricsCollector(registry)
val resources = for {
_ <- JmxReporterResource.make[IO](registry)
client <- RabbitClient.default[IO](config).withMetricsCollector(dropwizardCollector).resource
channel <- client.createConnection.flatMap(client.createChannel)
} yield (channel, client)
val program = resources.use {
case (channel, client) =>
implicit val c = channel
val setup =
for {
_ <- client.declareQueue(DeclarationQueueConfig.default(queueName))
_ <- client.declareExchange(DeclarationExchangeConfig.default(exchangeName, Topic))
_ <- client.bindQueue(queueName, exchangeName, routingKey)
ackerConsumer <- client.createAckerConsumer[String](queueName)
(acker, consumer) = ackerConsumer
publisher <- client.createPublisher[AmqpMessage[String]](exchangeName, routingKey)
} yield (consumer, acker, publisher)
Stream
.eval(setup)
.flatTap {
case (consumer, acker, publisher) =>
Stream(
Stream(simpleMessage).evalMap(publisher).repeat.metered(1.second),
consumer.through(logPipe).evalMap(acker)
).parJoin(2)
}
.compile
.drain
}
program.as(ExitCode.Success)
}
def logPipe[F[_]: Sync]: Pipe[F, AmqpEnvelope[String], AckResult] =
_.evalMap { amqpMsg =>
putStrLn(s"Consumed: $amqpMsg").as(Ack(amqpMsg.deliveryTag))
}
}
object JmxReporterResource {
def make[F[_]: Sync](registry: MetricRegistry): Resource[F, JmxReporter] = {
val acquire = Sync[F].delay {
val reporter = JmxReporter.forRegistry(registry).inDomain("com.rabbitmq.client.jmx").build
reporter.start()
reporter
}
val close = (reporter: JmxReporter) => Sync[F].delay(reporter.close()).void
Resource.make(acquire)(close)
}
}