我们正在使用Prometheus和Grafana来监控我们的Kafka集群。
在我们的应用程序中,我们使用Kafka流,并且Kafka流有可能由于异常而停止。我们正在记录事件setUnCaughtExceptionHandler
但是,当流停止时,我们也需要某种警报。
我们目前拥有的是,jmx_exporter作为代理运行,并通过endpoint公开Kafka指标,prometheus从endpoint获取指标。
我们没有看到任何类型的指标来提供每个主题的活跃消费者数量。我们是否遗漏了什么?关于如何获取活跃消费者数量并在消费者停止时发送警报的任何建议。
我们有类似的需求,并将每个分区的Kafka消费者延迟添加到Grafana中,并且还添加了如果延迟超过指定阈值的警报(每个主题的阈值应该不同,具体取决于负载,例如对于某些主题可能是10,对于高负载-100000)。
您可以为每个kafka流添加状态侦听器,以防流处于错误状态,记录错误或发送电子邮件:
kafkaStream.setStateListener((newState, oldState) -> {
log.info("Kafka stream state changed [{}] >>>>> [{}]", oldState, newState);
if (newState == KafkaStreams.State.ERROR || newState == KafkaStreams.State.PENDING_SHUTDOWN) {
log.error("Kafka Stream is in [{}] state. Application should be restarted", newState);
}
});
您还可以添加健康检查指示器(例如通过RESTendpoint或通过spring-boot
HealthQueator
),提供流是否正在运行的信息:
KafkaStreams. State stream State=kafkaStream.state();state.isR不();
我也没有发现任何kafka流指标提供有关活动消费者或可用连接分区的信息,但对我来说,如果kafka流提供这样的数据(并希望它在未来的版本中可用),那就太好了。