Autoscaling with Prometheus, Alertmanager and SCDF Scale API
To learn about the basic scaling concepts in Spring Cloud Data Flow, see the Scaling guide.
Overview
The solution uses the Prometheus Alert Rules to define scale-out and scale-in alerts based on application throughput metrics. The alerts are managed by the Prometheus AlertManager and a custom webhook, which in turn triggers Scale API calls in SCDF.
For a streaming data pipeline (time | transform | log
), we show how to measure the throughput rates between the time
and the transform
applications, so we can use that as the deciding factor for alerts. We discuss how the alerts are triggered when the defined threshold exceeds the set rules and when the autoscale calls are triggered.
The following pseudocode illustrates the logic of such alert rules:
rateDifference = rate(time) - rate(transform)
if rateDifference > 500 for 1 minute do fire HighThroughputDifference
if rateDifference == 0 for 3 minutes do fire ZeroThroughputDifference
- Query expression that computes, in real-time, the throughput (that is, the rate) difference between the
time
andtransform
applications. -
HighThroughputDifference
alert rule that fires when the rate difference exceeds500 msg/s
for a duration of1 min
. -
ZeroThroughputDifference
alert rule that fires if the rate difference stays0 msg/s
for at least3 min
.
The following diagram shows the high level architecture.
The Data Flow metrics architecture is designed with the help of the Micrometer library. Prometheus can be one of the monitoring backends that can collect various application metrics for performance analysis, and it allows for alert configurations as well.
The alerting in Prometheus is divided into:
Alert Rules
: Defined inside and triggered by the Prometheus service.Alertmanager
: A standalone service that receives and manages the fired alerts and in turn sends out notifications to a pre-registered webhooks.
The Alert Rules are based on the Prometheus Expression Language (PQL).
They are used to define and send the scale alerts to Alertmanager
.
For example, the scale-out alert rule definition looks something like this:
alert: HighThroughputDifference
expr: avg(irate(spring_integration_send_seconds_count{application_name="time"}[1m])) by(stream_name) -
avg(irate(spring_integration_send_seconds_count{application_name="transform"}[1m])) by(stream_name) > 500
for: 30s
See alert.rule.yml (and here in the kubectl installation) to see the alert rules definitions in use.
The spring_integration_send_seconds_count
metrics comes from the spring integration
micrometer support and is used for computing the message rate.
The Alertmanager is a standalone service that manages the alerts, including silencing, inhibition, aggregation, and sending out notifications to pre-configured webhooks.
The AlertWebHookApplication (part of the scdf-alert-webhook Spring Boot app) is a custom Spring Boot application and is registered as an Alertmanager Webhook Receiver in config.yml.
The AlertWebHookApplication
receives the alert notifications (in JSON format) from Prometheus. With the help of SCDF's Scale API, it can then trigger the scale-out request to autoscale the application referred by the alert streaming data pipelines in SCDF.
The alert notifications also contain the metrics labels used in the alert PQL expressions.
For our example, that means that the stream_name
label is passed along with the notifications alerts, letting the AlertWebHookApplication
determine the name of the data pipeline that has to be scaled.
The Data Flow Scale REST API provides a platform-agnostic mechanism for scaling data pipeline applications.
The AlertWebHookApplication
uses the spring.cloud.dataflow.client.server-uri
property to configure the Scale API endpoint. See alertwebhook-deployment.yaml for the entire deployment configuration.
The following video animation shows the Data Flow auto-scaling flow:
Prerequisite
This recipe uses the Kubernetes platform.
The recipes is successfully tested on GKE cluster with five nodes.
The high CPU requirements due to the multiple app instances would make it difficult or impossible to run the recipe on minikube.
Follow the Kubectl installation instructions to set up Spring Cloud Data Flow with Kafka broker.
Then install the Alertmanager
and the AlertWebHook
and reconfigure the Prometheus
services:
kubectl apply -f https://raw.githubusercontent.com/spring-cloud/spring-cloud-dataflow-samples/main/dataflow-website/recipes/scaling/kubernetes/alertwebhook/alertwebhook-svc.yaml
kubectl apply -f https://raw.githubusercontent.com/spring-cloud/spring-cloud-dataflow-samples/main/dataflow-website/recipes/scaling/kubernetes/alertwebhook/alertwebhook-deployment.yaml
kubectl apply -f https://raw.githubusercontent.com/spring-cloud/spring-cloud-dataflow-samples/main/dataflow-website/recipes/scaling/kubernetes/alertmanager/prometheus-alertmanager-service.yaml
kubectl apply -f https://raw.githubusercontent.com/spring-cloud/spring-cloud-dataflow-samples/main/dataflow-website/recipes/scaling/kubernetes/alertmanager/prometheus-alertmanager-deployment.yaml
kubectl apply -f https://raw.githubusercontent.com/spring-cloud/spring-cloud-dataflow-samples/main/dataflow-website/recipes/scaling/kubernetes/alertmanager/prometheus-alertmanager-configmap.yaml
wget https://raw.githubusercontent.com/spring-cloud/spring-cloud-dataflow-samples/main/dataflow-website/recipes/scaling/kubernetes/prometheus/prometheus-configmap.yaml
kubectl patch cm my-release-prometheus-server --patch "$(cat ./prometheus-configmap.yaml)"
kubectl delete pods -l app=prometheus
Replace my-release-prometheus-server
with your current Prometheus CM. Use kubectl get cm
to list your configurations.
Follow the Helm installation instructions to set up Spring Cloud Data Flow with a Kafka broker. You can use features.monitoring.enabled=true
with at least 10Gi
storage space:
helm install --name my-release stable/spring-cloud-data-flow --set features.monitoring.enabled=true,kafka.enabled=true,rabbitmq.enabled=false,kafka.persistence.size=10Gi
Then install the Alertmanager
and the AlertWebHook
and reconfigure the Prometheus
services:
kubectl apply -f https://raw.githubusercontent.com/spring-cloud/spring-cloud-dataflow-samples/main/dataflow-website/recipes/scaling/kubernetes/helm/alertwebhook/alertwebhook-svc.yaml
kubectl apply -f https://raw.githubusercontent.com/spring-cloud/spring-cloud-dataflow-samples/main/dataflow-website/recipes/scaling/kubernetes/helm/alertwebhook/alertwebhook-deployment.yaml
kubectl apply -f https://raw.githubusercontent.com/spring-cloud/spring-cloud-dataflow-samples/main/dataflow-website/recipes/scaling/kubernetes/alertmanager/prometheus-alertmanager-service.yaml
kubectl apply -f https://raw.githubusercontent.com/spring-cloud/spring-cloud-dataflow-samples/main/dataflow-website/recipes/scaling/kubernetes/alertmanager/prometheus-alertmanager-deployment.yaml
kubectl apply -f https://raw.githubusercontent.com/spring-cloud/spring-cloud-dataflow-samples/main/dataflow-website/recipes/scaling/kubernetes/alertmanager/prometheus-alertmanager-configmap.yaml
wget https://raw.githubusercontent.com/spring-cloud/spring-cloud-dataflow-samples/main/dataflow-website/recipes/scaling/kubernetes/helm/prometheus/prometheus-configmap.yaml
kubectl patch cm my-release-prometheus-server --patch "$(cat ./prometheus-configmap.yaml)"
Register the latest kafka-docker app starters:
app import --uri https://dataflow.spring.io/kafka-docker-latest
Start an SCDF Shell and connect it to your Data Flow Server:
server-unknown:>dataflow config server http://<SCDF IP>
Autoscaling Recipe
This recipe shows how to set up autoscaling.
Create a Data Pipeline
stream create --name scaletest --definition "time --fixed-delay=995 --time-unit=MILLISECONDS | transform --expression=\"payload + '-' + T(java.lang.Math).exp(700)\" | log"
The time
source generates current timestamp messages on a fixed time-interval (995ms = ~1 msg/s), the transform
processor performs a math operation to simulate a high CPU processing, and the log
sink prints the transformed message payload.
Deploy a Data Pipeline with Data Partitioning
stream deploy --name scaletest --properties "app.time.producer.partitionKeyExpression=payload,app.transform.spring.cloud.stream.kafka.binder.autoAddPartitions=true,app.transform.spring.cloud.stream.kafka.binder.minPartitionCount=4"
The producer.partitionKeyExpression=payload
property configures the time source’s output binding for partitioning. The partition key expression uses the message payload (that is, the toString()
value of the current timestamp) to compute how the data needs to be partitioned to the downstream output channels.
The spring.cloud.stream.kafka.binder.autoAddPartitions
deployment property instructs the Kafka binder to create new partitions when required. This is required if the topic is not already over-partitioned.
The spring.cloud.stream.kafka.binder.minPartitionCount
property sets the minimum number of partitions that the Kafka binder configures on the topic, which is where the transform-processor is subscribing for new data.
Use the SCDF's built-in Grafana dashboard to review the stream application's throughput and the other application metrics:
The time
, transform
, and log
applications maintain the same message throughput (~1 msg/s). The transform
handles the current load.
Increase a Data Pipeline Load
Now we increase the load by increasing the time-source's message production rate. By changing time's time-unit
property from MILLISECONDS
to MICROSECONDS
, the input rate increases from one to thousands of messages per second.
Note that the stream rolling-update functionality lets you peforms a rolling update of the time application without stopping the entire stream:
stream update --name scaletest --properties "app.time.trigger.time-unit=MICROSECONDS"
The time
app is re-deployed with the new time-unit property:
Now the time
source emits messages with a rate of ~5000 msg/s
. The transform
processor, though, is capped at around 1000 msg/s
and, in turn, halts the throughput of the entire stream to a certain level. This is an indicator that the transform
has become the bottleneck.
The HighThroughputDifference
Prometheus alert rule detects the rate discrepancy and fires a scale-out alert:
This results in three additional transform instances being added:
With the help of the additional instances of the log
sink, the entire data pipeline catches up to match the time
source's production rate:
Reduce a Data Pipeline Load
Suppose we reduce the time source's data production rate back to the original rate (that is, 1 msg/s
):
stream update --name scaletest --properties "app.time.trigger.time-unit=MILLISECONDS"
The extra transform
processor instances no longer alter the overall throughput rate.
Eventually, the rate difference becomes zero and the ZeroThroughputDifference
alert is fired. This alert, in turn, triggers a scale-in action, and the extra instances can be scaled back in:
With a single transform
instance, the throughput of the entire data pipeline is normalized back to ~1 msg/s
.