January 27

How to integrate kafka-prometheus-exporter and NestJs prometheus module

Monitoring applications is crucial for understanding their performance, availability, and health. In this article, we’ll walk through setting up monitoring for a NestJS application using Prometheus.

There are a lot of guides about setting up Prometheus module in NestJS application and several guides about usage kafka-prometheus-exporter library. The last one is not wideused for unknown reason for me. But there is no instructrusctions how to set up both.

Firstly, let’s intall and setup Prometheus module

npm install @willsoto/nestjs-prometheus prom-client

import { Module } from "@nestjs/common";

import { PrometheusModule } from "@willsoto/nestjs-prometheus";

@Module({

imports: [PrometheusModule.register()],

})

export class AppModule {}

At this stage you can run your application and visit localhost:3000/metrics to see default metrics.

The below snippet shows kafka client configuration and provides util methods for monitoring.

You don’t need to create new Registry() as it mentions in other guides. Just retrieve the existing ones and pass it to kafkajs-prometheus-exporter functions.

// kafka.config.ts

import {Consumer, Kafka, Producer} from 'kafkajs';

import {Injectable} from '@nestjs/common';

import {ConfigService} from '@nestjs/config';

import * as fs from 'fs';

import {

monitorKafkaJSAdmin,

monitorKafkaJSConsumer,

monitorKafkaJSProducer

} from '@christiangalsterer/kafkajs-prometheus-exporter';

import {Registry} from 'prom-client';

import * as promClient from 'prom-client';

@Injectable()

export class KafkaConfig {

private readonly _kafkaConfig: Kafka;

private readonly registry: Registry;

constructor(private readonly configService: ConfigService) {

this.registry = promClient.register

this._kafkaConfig = new Kafka({

brokers: configService.get('KAFKA_BROKER'),

clientId: configService.get('KAFKA_CLIENT_ID'),

});

let admin = this._kafkaConfig.admin();

monitorKafkaJSAdmin(admin, this.registry);

admin.connect().then(() => {console.log('Kafka admin connected');});

}

getConfig(): Kafka {

return this._kafkaConfig;

}

monitorConsumer(consumer: Consumer): void {

monitorKafkaJSConsumer(consumer, this.registry, { defaultLabels: {client_id: this.configService.get('KAFKA_CLIENT_ID')} });

}

monitorProducer(producer: Producer): void {

monitorKafkaJSProducer(producer, this.registry, { defaultLabels: {client_id: this.configService.get('KAFKA_CLIENT_ID')} });

}

}

Below you can find short snippets of usage. Be aware connect() should be called after registering the metrics.

// create producer

this.kafka = kafkaConfig.getConfig();

this.producer = this.kafka.producer();

// monitor before connect()

kafkaConfigService.monitorProducer(this.producer);

await this.producer.connect();

// create consumer

this.kafka = kafkaConfigService.getConfig();

this.consumer = this.kafka.consumer(config);

// monitor before connect()

kafkaConfigService.monitorConsumer(this.consumer);

await this.consumer.connect();