Techno Blender
Digitally Yours.

Custom Kafka metrics using Apache Spark PrometheusServlet | by Vitor Teixeira | Feb, 2023

0 50


Photo by Christin Hume on Unsplash

In this blog post, I will describe how to create and enhance current Spark Structured Streaming metrics with Kafka consumer metrics and expose them using the Spark 3 PrometheusServlet that can be directly targeted by Prometheus. In previous Spark versions, one must set up either a JmxSink/JmxExporter, GraphiteSink/GraphiteExporter, or a custom sink deploying metrics to a PushGateway server. With that said, we couldn’t really avoid the increase in the complexity of our solutions as we must set up external components that interact with our applications so that they can be scraped by Prometheus.

Motivation

More than ever, observability is a must when it comes to software. It allows us to get insights into what is happening inside the software without having to directly interact with the system. One way of building upon this observability pillar is by exposing application metrics. When built upon an observability stack, they allow us to detect problems either by alerts or simply looking at a dashboard and finding their root cause by analyzing metrics.

Apache Spark applications are no different. It is true that one can access the Spark Web UI and gather insights into how our application is running, but when the number of applications increases by ten or hundredfold it becomes hard to troubleshoot them. That is when an observability tool like Grafana comes in handy. Grafana is able to connect to Prometheus databases, and Prometheus integrates seamlessly with our applications by targeting the PrometheusServlet.

When configurated, Apache Spark exposes several metrics natively, which are detailed here. In Structured Streaming, no metrics are exposed by default unless we set "spark.sql.streaming.metricsEnabled" -> "true". Below is an example of the metrics that are exposed on a Kafka Streaming job:

Default Spark Structured Streaming metrics

As we can see, these metrics are very generic and do not provide any detailed information about our source.

The goal is to be able to expose Kafka Consumer metrics that help us monitor how our event consumption is going.

Metrics should be quantifiable values that provide real-time insights about the status or performance of the application. In the scope of this article, we’ll be covering the following metrics:

  • Start offsets: The offsets where the streaming query first started.
  • End offsets: The last processed offsets by the streaming query. Tracks the consumer progress of the query.
  • Lead offsets: The latest offsets of the topic the streaming query is consuming. Tracks the evolution of the consumed topic.
  • Lag offsets: The difference between the last processed offsets and the lead offsets from the topic. Tracks how far a streaming query is in comparison with real-time.
  • Consumed rate: The consumed rate of the streaming query topics. It is the sum of all the topics subscribed on a streaming query.
  • Last record timestamp: The last message timestamp consumed from each TopicPartition. It tracks the latency between the producer and the consumer.

The next step, after defining the metrics, is to create the metric source that will be responsible for exposing the metrics to Spark’s MetricsSystem.

In order to expose the metrics we need to create a class that extends Source. What this does is create an executor connection to the driver to pass the metrics as part of the heartbeat process.

KafkaMetricsSource implementation

Make sure to define the gauges as SettableGauge in order to be able to update them during the execution.

After having the source defined all we need to do is instantiate the source and register it in Spark’s MetricsSystem.

A simplified version of source registration

For the full code of the source implementation, you can check:

Now that we have our source in place all we need is to make use of it. For that, we’ll need the metrics to populate our recently created gauges.

If you ran a Structured Streaming job before you might have noticed an output similar to the following when a Streaming Query progresses:

By analyzing the output we can see that we already have most of the metrics available just by parsing the JSON. Start offsets metric using startOffsets, end offsets using endOffset, lead offsets using latestOffset, source.inputRowsPerSecondas the consumed rate for the source, and lag offsets, that won’t be using the metrics values as we can instead calculate the lag for each TopicPartition by using the endOffset and latestOffset values, ending up with a more granular metric.

In order to make use of this information we can leverage the StreamingQueryListener onProgress event.

The first step is creating a class that extends StreamingQueryListener so that we’re able to receive and act upon the progress events. It receives the KafkaMetricsSource we previously created that will be responsible for emitting the metrics.

KafkaOffsetsQueryListener snippet

The last step is just registering the listener in the desired streams on which we would want to receive the updates.

If you wish to check the full listener code you can do so here:

The last metric is the last processed record timestamp for each TopicPartition. This metric is trickier than the previous ones because the information resides in Spark Executors in computing time.

The only way we can access that information is by creating a dummy Spark expression that acts as a side effect. We’ll use value column from Spark’s Kafka Schema as a way to trick Spark into running this transformation.

KafkaTimestampMetrics expression

This expression receives the whole Kafka Row, extracts the necessary values, and emits the metric. It will return the value column which, as we said above, will be used to trick Spark into running this expression.

Expression usage

If we, for instance, used timestamp column and ended up selecting the value column further in our query, Spark would truncate this expression out of the final plan and our metric would be ignored. If you don’t plan to use the value column (which is rather uncommon), make sure to use an appropriate column.

After we have everything set up all we need is to enable the Prometheus on the UI by setting spark.ui.prometheus.enabled=true (it creates a single endpoint containing all driver and executor metrics) and configuring spark.metrics.conf with the required configurations.

Running the application and accessing http://localhost:4040/metrics/prometheus will show you all the metrics we’ve previously created alongside the native ones.

Metrics exposed on /metrics/prometheus/ endpoint

There are a few limitations to this new feature. One of them is that this endpoint only exposes metrics that start with metrics_ or spark_info. In addition to this, Prometheus naming conventions are not followed by Spark, and labels aren’t currently supported (not that I know, if you know a way hit me up!). This means that we’ll have A LOT of different metrics in Prometheus, which might be a bit overwhelming. This issue can be solved by relabeling the metrics but it can be troublesome.

That’s it! You are now able to create and expose Kafka custom metrics using Spark’s Prometheus native integration. Now, all it’s left is to have Prometheus scrape all the endpoints and use the metrics to create both pretty dashboards and alarms that will help you have more visibility on what’s happening inside your applications.

Despite Prometheus metrics sink early limitations, which is understandable as it is an experimental feature, I believe that it will be soon enhanced with more and more customizations.

If you wish to see the full project used in this article please check:


Photo by Christin Hume on Unsplash

In this blog post, I will describe how to create and enhance current Spark Structured Streaming metrics with Kafka consumer metrics and expose them using the Spark 3 PrometheusServlet that can be directly targeted by Prometheus. In previous Spark versions, one must set up either a JmxSink/JmxExporter, GraphiteSink/GraphiteExporter, or a custom sink deploying metrics to a PushGateway server. With that said, we couldn’t really avoid the increase in the complexity of our solutions as we must set up external components that interact with our applications so that they can be scraped by Prometheus.

Motivation

More than ever, observability is a must when it comes to software. It allows us to get insights into what is happening inside the software without having to directly interact with the system. One way of building upon this observability pillar is by exposing application metrics. When built upon an observability stack, they allow us to detect problems either by alerts or simply looking at a dashboard and finding their root cause by analyzing metrics.

Apache Spark applications are no different. It is true that one can access the Spark Web UI and gather insights into how our application is running, but when the number of applications increases by ten or hundredfold it becomes hard to troubleshoot them. That is when an observability tool like Grafana comes in handy. Grafana is able to connect to Prometheus databases, and Prometheus integrates seamlessly with our applications by targeting the PrometheusServlet.

When configurated, Apache Spark exposes several metrics natively, which are detailed here. In Structured Streaming, no metrics are exposed by default unless we set "spark.sql.streaming.metricsEnabled" -> "true". Below is an example of the metrics that are exposed on a Kafka Streaming job:

Default Spark Structured Streaming metrics

As we can see, these metrics are very generic and do not provide any detailed information about our source.

The goal is to be able to expose Kafka Consumer metrics that help us monitor how our event consumption is going.

Metrics should be quantifiable values that provide real-time insights about the status or performance of the application. In the scope of this article, we’ll be covering the following metrics:

  • Start offsets: The offsets where the streaming query first started.
  • End offsets: The last processed offsets by the streaming query. Tracks the consumer progress of the query.
  • Lead offsets: The latest offsets of the topic the streaming query is consuming. Tracks the evolution of the consumed topic.
  • Lag offsets: The difference between the last processed offsets and the lead offsets from the topic. Tracks how far a streaming query is in comparison with real-time.
  • Consumed rate: The consumed rate of the streaming query topics. It is the sum of all the topics subscribed on a streaming query.
  • Last record timestamp: The last message timestamp consumed from each TopicPartition. It tracks the latency between the producer and the consumer.

The next step, after defining the metrics, is to create the metric source that will be responsible for exposing the metrics to Spark’s MetricsSystem.

In order to expose the metrics we need to create a class that extends Source. What this does is create an executor connection to the driver to pass the metrics as part of the heartbeat process.

KafkaMetricsSource implementation

Make sure to define the gauges as SettableGauge in order to be able to update them during the execution.

After having the source defined all we need to do is instantiate the source and register it in Spark’s MetricsSystem.

A simplified version of source registration

For the full code of the source implementation, you can check:

Now that we have our source in place all we need is to make use of it. For that, we’ll need the metrics to populate our recently created gauges.

If you ran a Structured Streaming job before you might have noticed an output similar to the following when a Streaming Query progresses:

By analyzing the output we can see that we already have most of the metrics available just by parsing the JSON. Start offsets metric using startOffsets, end offsets using endOffset, lead offsets using latestOffset, source.inputRowsPerSecondas the consumed rate for the source, and lag offsets, that won’t be using the metrics values as we can instead calculate the lag for each TopicPartition by using the endOffset and latestOffset values, ending up with a more granular metric.

In order to make use of this information we can leverage the StreamingQueryListener onProgress event.

The first step is creating a class that extends StreamingQueryListener so that we’re able to receive and act upon the progress events. It receives the KafkaMetricsSource we previously created that will be responsible for emitting the metrics.

KafkaOffsetsQueryListener snippet

The last step is just registering the listener in the desired streams on which we would want to receive the updates.

If you wish to check the full listener code you can do so here:

The last metric is the last processed record timestamp for each TopicPartition. This metric is trickier than the previous ones because the information resides in Spark Executors in computing time.

The only way we can access that information is by creating a dummy Spark expression that acts as a side effect. We’ll use value column from Spark’s Kafka Schema as a way to trick Spark into running this transformation.

KafkaTimestampMetrics expression

This expression receives the whole Kafka Row, extracts the necessary values, and emits the metric. It will return the value column which, as we said above, will be used to trick Spark into running this expression.

Expression usage

If we, for instance, used timestamp column and ended up selecting the value column further in our query, Spark would truncate this expression out of the final plan and our metric would be ignored. If you don’t plan to use the value column (which is rather uncommon), make sure to use an appropriate column.

After we have everything set up all we need is to enable the Prometheus on the UI by setting spark.ui.prometheus.enabled=true (it creates a single endpoint containing all driver and executor metrics) and configuring spark.metrics.conf with the required configurations.

Running the application and accessing http://localhost:4040/metrics/prometheus will show you all the metrics we’ve previously created alongside the native ones.

Metrics exposed on /metrics/prometheus/ endpoint

There are a few limitations to this new feature. One of them is that this endpoint only exposes metrics that start with metrics_ or spark_info. In addition to this, Prometheus naming conventions are not followed by Spark, and labels aren’t currently supported (not that I know, if you know a way hit me up!). This means that we’ll have A LOT of different metrics in Prometheus, which might be a bit overwhelming. This issue can be solved by relabeling the metrics but it can be troublesome.

That’s it! You are now able to create and expose Kafka custom metrics using Spark’s Prometheus native integration. Now, all it’s left is to have Prometheus scrape all the endpoints and use the metrics to create both pretty dashboards and alarms that will help you have more visibility on what’s happening inside your applications.

Despite Prometheus metrics sink early limitations, which is understandable as it is an experimental feature, I believe that it will be soon enhanced with more and more customizations.

If you wish to see the full project used in this article please check:

FOLLOW US ON GOOGLE NEWS

Read original article here

Denial of responsibility! Techno Blender is an automatic aggregator of the all world’s media. In each content, the hyperlink to the primary source is specified. All trademarks belong to their rightful owners, all materials to their authors. If you are the owner of the content and do not want us to publish your materials, please contact us by email – [email protected]. The content will be deleted within 24 hours.

Leave a comment