Scale Apache Kafka Applications on Vultr Kubernetes Engine with KEDA

Updated on June 22, 2024
Scale Apache Kafka Applications on Vultr Kubernetes Engine with KEDA header image

Introduction

Kafka is a distributed event streaming platform designed for high-throughput, fault-tolerance, and scalable data streaming and processing. It's based on a producer-consumer architecture where producers send data to Kafka topics and consumers retrieve messages from the Kafka topics. Consumer groups allow a set of consumers to work together and share data processing, this ensures that each message gets processed by only one consumer in the group.

Kubernetes Event-Driven Autoscaling (KEDA) is a cloud-native event-driven auto-scaler (CNCF project) for container workloads. In KEDA, scalers are extensible components that monitor external systems and produce metrics to drive the scaling process for Kubernetes workloads.

This guide explains how to use the KEDA scaler for Kafka on a Vultr Kubernetes Engine (VKE) cluster to drive auto-scaling and allow Kafka consumer application pods to scale up and down based on the consumer group lag.

Prerequisites

Before you begin:

Install the KEDA Operator

  1. Create a directory

     $ mkdir kafka-keda-vultr
  2. Switch to the directory

     $ cd kafka-keda-vultr
  3. Set the KUBECONFIG environment variable with the path to your VKE YAML file to grant kubectl access to the cluster

     $ export KUBECONFIG=/path/to/vke/YAML

    The above command allows Kubectl to use your VKE YAML file as the default cluster file instead of localhost

  4. Deploy KEDA using its deployment YAML file

     $ kubectl apply --server-side -f https://github.com/kedacore/keda/releases/download/v2.11.2/keda-2.11.2-core.yaml
  5. Verify the KEDA deployment status

     $ kubectl get deployment -n keda

    Your output should look like the one below:

     NAME                     READY   UP-TO-DATE   AVAILABLE   AGE
     keda-metrics-apiserver   1/1     1            1           57s
     keda-operator            1/1     1            1           57s

    Verify that the KEDA deployment status is READY

Set Up a Single Node Kafka cluster using the Strimzi Operator

  1. Install the Strimzi operator

     $ kubectl create -f 'https://strimzi.io/install/latest?namespace=default'
  2. Using a text editor such as Nano, create a new file kafka-cluster.yaml

     $ nano kafka-cluster.yaml
  3. Add the following contents to the file

     apiVersion: kafka.strimzi.io/v1beta2
     kind: Kafka
     metadata:
       name: my-cluster
     spec:
       kafka:
         version: 3.4.0
         replicas: 1
         listeners:
           - name: plain
             port: 9092
             type: internal
             tls: false
         config:
           offsets.topic.replication.factor: 1
           transaction.state.log.replication.factor: 1
           transaction.state.log.min.isr: 1
           default.replication.factor: 1
           min.insync.replicas: 1
           inter.broker.protocol.version: "3.4"
         storage:
           type: ephemeral
       zookeeper:
         replicas: 1
         storage:
           type: ephemeral
       entityOperator:
         topicOperator: {}
         userOperator: {}

    Save and close the file

  4. Deploy the Kafka cluster

     $ kubectl apply -f kafka-cluster.yaml
  5. Wait for a few minutes for the cluster creation to complete. Run the following command to wait for the cluster creation to complete

     $ kubectl wait kafka/my-cluster --for=condition=Ready --timeout=600s

    When complete, the following output should display:

     kafka.kafka.strimzi.io/my-cluster condition met
  6. When the cluster creation is complete, create a topic

     $ kubectl run kafka-topics -ti --image=quay.io/strimzi/kafka:0.32.0-kafka-3.3.1 --rm=true --restart=Never -- bin/kafka-topics.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 --create --topic test-topic --partitions 5 --replication-factor 1

    Your output should look like the one below:

     Created topic test-topic.
     pod "kafka-topics" deleted

Prepare the Kafka Consumer Application

  1. Create a new Go module

     $ go mod init kafka-consumer
  2. Create a new file main.go

     $ nano main.go
  3. Add the following contents to the file

     package main
    
     import (
         "context"
         "fmt"
         "log"
         "os"
         "os/signal"
         "strings"
         "syscall"
         "time"
    
         "github.com/twmb/franz-go/pkg/kgo"
     )
    
     const consumerGroupName = "my-group"
    
     var kafkaBroker string
     var topic string
     var client *kgo.Client
    
     func init() {
    
         kafkaBroker = os.Getenv("KAFKA_BROKER")
         if kafkaBroker == "" {
             log.Fatal("missing env var KAFKA_BROKER")
         }
    
         topic = os.Getenv("KAFKA_TOPIC")
         if topic == "" {
             log.Fatal("missing env var KAFKA_TOPIC")
         }
    
         fmt.Println("KAFKA_BROKER", kafkaBroker)
         fmt.Println("KAFKA_TOPIC", topic)
    
         opts := []kgo.Opt{
             kgo.SeedBrokers(strings.Split(kafkaBroker, ",")...),
             kgo.ConsumeTopics(topic),
             kgo.ConsumerGroup(consumerGroupName),
             kgo.OnPartitionsAssigned(partitionsAssigned),
             kgo.OnPartitionsRevoked(partitionsRevoked),
             kgo.OnPartitionsLost(partitionsLost),
         }
    
         var err error
    
         client, err = kgo.NewClient(opts...)
    
         if err != nil {
             log.Fatal(err)
         }
     }
    
     func main() {
    
         go func() {
             fmt.Println("starting kafka consumer goroutine")
             for {
                 err := client.Ping(context.Background())
                 if err != nil {
                     log.Fatal("ping failed - ", err)
                 }
                 fmt.Println("fetching records....")
    
                 fetches := client.PollRecords(context.Background(), 0)
    
                 if fetches.IsClientClosed() {
                     fmt.Println("kgo kafka client closed")
                     return
                 }
                 fetches.EachError(func(t string, p int32, err error) {
                     fmt.Printf("fetch err - topic %s partition %d: %v\n", t, p, err)
                 })
    
                 fetches.Records()
    
                 fetches.EachRecord(func(r *kgo.Record) {
                     fmt.Printf("got record from partition %v key=%s val=%s\n", r.Partition, string(r.Key), string(r.Value))
    
                     time.Sleep(3 * time.Second)
                     err = client.CommitRecords(context.Background(), r)
    
                     if err != nil {
                         fmt.Println("commit failed for record with offset", r.Offset, "in partition", r.Partition)
                     } else {
                         fmt.Println("committed record with offset", r.Offset, "in partition", r.Partition)
                     }
                 })
             }
         }()
    
         end := make(chan os.Signal, 1)
         signal.Notify(end, syscall.SIGINT, syscall.SIGTERM)
    
         <-end
    
         client.Close()
         fmt.Println("kafka consumer exit")
     }
    
     func partitionsAssigned(ctx context.Context, c *kgo.Client, m map[string][]int32) {
         fmt.Printf("partitions ASSIGNED for topic %s %v\n", topic, m[topic])
     }
    
     func partitionsRevoked(ctx context.Context, c *kgo.Client, m map[string][]int32) {
         fmt.Printf("partitions REVOKED for topic %s %v\n", topic, m[topic])
     }
    
     func partitionsLost(ctx context.Context, c *kgo.Client, m map[string][]int32) {
         fmt.Printf("partitions LOST for topic %s %v\n", topic, m[topic])
     }

    Save and close the file.

    Below are what the above application parts do:

    • The init function:
      • Fetches the Kafka broker and topic information from the environment variables
      • If these environment variables aren't provided, the program exits with an error code
      • Initializes the Kafka client with various configuration options, Kafka broker addresses, topic to consume, consumer group name, and sends a callback when a partition gets assigned, revoked, or lost
    • The main function starts a goroutine to poll and process Kafka records, then:
      • The goroutine polls for Kafka records and logs any fetch errors
      • Processes each record (simulates some processing with a 3-second sleep), prints the record's details, and commits the record to Kafka
      • Listens for SIGINT or SIGTERM to ensure that the application can be gracefully shut down using Ctrl + C, or when receiving a termination signal.
      • When the program detects a termination signal, the Kafka client closes, and the program exits

Deploy the Application to your VKE Cluster

  1. Create a new Dockerfile to store Docker variables

     $ nano Dockerfile
  2. Add the following contents to the file

     FROM golang:1.19-buster AS build
    
     WORKDIR /app
     COPY go.mod ./
     COPY go.sum ./
    
     RUN go mod download
    
     COPY main.go ./
     RUN go build -o /kafka-go-app
    
     FROM gcr.io/distroless/base-debian10
     WORKDIR /
     COPY --from=build /kafka-go-app /kafka-go-app
     USER nonroot:nonroot
     ENTRYPOINT ["/kafka-go-app"]

    Save and close the file

    The above Dockerfile builds the Kafka consumer Go application using a multi-stage build

  3. Fetch the program Go module dependencies

     $ go mod tidy
  4. Log in to Docker using your active Docker Hub account

     $ sudo docker login
  5. Build the Docker image. Replace example-user with your actual Docker Hub ID

     $ sudo docker build -t example-user/myapp .
  6. Push the image to Docker hub

     $ sudo docker push example-user/myapp

    Verify that the command is successful and a new myapp repository is available on your DockerHub profile

  7. Create a new file consumer.yaml

     $ nano consumer.yaml
  8. Add the following contents to the file. Replace example-user/myapp with your actual Docker repository

     apiVersion: apps/v1
     kind: Deployment
     metadata:
       name: kafka-consumer-app
       labels:
         app: kafka-consumer-app
     spec:
       replicas: 1
       selector:
         matchLabels:
           app: kafka-consumer-app
       template:
         metadata:
           labels:
             app: kafka-consumer-app
         spec:
           containers:
           - name: kafka-consumer-app-container
             image: example-user/myapp
             imagePullPolicy: Always
             env:
               - name: KAFKA_BROKER
                 value: my-cluster-kafka-bootstrap:9092
               - name: KAFKA_TOPIC
                 value: test-topic

    Save and close the file

  9. Deploy the application to your cluster

     $ kubectl apply -f consumer.yaml
  10. Verify the application deployment status

     $ kubectl get pods -l=app=kafka-consumer-app

    Your output should look like the one below:

     NAME                                 READY   STATUS    RESTARTS   AGE
     kafka-consumer-app-c4b67d694-mptlw   1/1     Running   0          2m12s

    Verify that the Pod status changes to Running

  11. View the application logs

     $ kubectl logs -f -l=app=kafka-consumer-app

    Output:

     KAFKA_BROKER my-cluster-kafka-bootstrap:9092
     KAFKA_TOPIC test-topic
     starting kafka consumer goroutine
     fetching records....
     partitions ASSIGNED for topic test-topic [0 1 2 3 4]

Enable Autoscaling

  1. Create a new file scaled-object.yaml

     $ nano scaled-object.yaml
  2. Add the following contents to the file

     apiVersion: keda.sh/v1alpha1
     kind: ScaledObject
     metadata:
       name: kafka-scaledobject
     spec:
       scaleTargetRef:
         name: kafka-consumer-app
       minReplicaCount: 1
       maxReplicaCount: 5
       pollingInterval: 30
       triggers:
       - type: kafka
         metadata:
           bootstrapServers: my-cluster-kafka-bootstrap.default.svc.cluster.local:9092
           consumerGroup: my-group
           topic: test-topic
           lagThreshold: "5"
           offsetResetPolicy: latest

    Save and close the file

  3. Deploy the KEDA scaled object

     $ kubectl apply -f scaled-object.yaml

Verify the Consumer Application Autoscaling

  1. Monitor the number of consumer application pods

     $ kubectl get pods -l=app=kafka-consumer-app -w
  2. In a new terminal window, export the KUBECONFIG variable to activate Kubectl in the session

     $ export KUBECONFIG=/path/to/vke/YAML
  3. Run the following command to send data to the Kafka topic

     $ kubectl run kafka-producer -ti --image=quay.io/strimzi/kafka:0.32.0-kafka-3.3.1 --rm=true --restart=Never -- bin/kafka-producer-perf-test.sh --throughput 200 --record-size 1000 --num-records 500 --topic test-topic --print-metrics --producer-props linger.ms=0 batch.size=16384 bootstrap.servers=my-cluster-kafka-bootstrap:9092

    The above native Kafka producer performance script generates the load that sends 500 records of 1000 bytes each to the test-topic at a rate of 200 records per second.

    When successful, your output should look like the one below:

     producer-topic-metrics:record-retry-total:{client-id=perf-producer-client, topic=test-topic} : 0.000
     producer-topic-metrics:record-send-rate:{client-id=perf-producer-client, topic=test-topic}   : 15.471
     producer-topic-metrics:record-send-total:{client-id=perf-producer-client, topic=test-topic}  : 500.000
     pod "kafka-producer" deleted
  4. Wait for one minute, and verify the Kafka consumer application Deployment status

     $ kubectl get deployment/kafka-consumer-app

    Your output should look like the one below:

     NAME                 READY   UP-TO-DATE   AVAILABLE   AGE
     kafka-consumer-app   5/5     5            5           2m48s

    Verify that the deployment has scaled up to five pods

  5. Navigate back to the monitoring terminal, and verify that your output looks like the one below:

     kafka-consumer-app-6bc79dd94f-jb867   0/1     ContainerCreating   0          0s
     kafka-consumer-app-6bc79dd94f-59bg4   1/1     Running             0          3s
     kafka-consumer-app-6bc79dd94f-f5ll4   1/1     Running             0          5s
     kafka-consumer-app-6bc79dd94f-jb867   1/1     Running             0          7s
     kafka-consumer-app-6bc79dd94f-wzrkp   0/1     Pending             0          0s
     kafka-consumer-app-6bc79dd94f-wzrkp   0/1     Pending             0          0s
     kafka-consumer-app-6bc79dd94f-wzrkp   0/1     ContainerCreating   0          0s
     kafka-consumer-app-6bc79dd94f-wzrkp   1/1     Running             0          3s

    Press Ctrl + C to exit the watch session

KEDA Autoscaling Results

Auto-scaling happens due to the KEDA ScaledObject you created earlier. KEDA monitors the test-topic Kafka topic for the consumer message lag, and when the unread message count exceeds 5, KEDA scales out the kafka-consumer-app deployment with a maximum limit of 5 replicas. KEDA checks the Kafka metrics every 30 seconds to make these scaling decisions.

KEDA also scales down to 1 replica when the consumer applications complete processing the messages and the consumer lag decreases.

Conclusion

In this guide, you used KEDA to auto-scale a Kafka consumer application deployed on a Vultr Kubernetes Engine (VKE) cluster. You set up KEDA and Kafka, deployed the application, used KEDA ScaledObject, and verified the auto-scaling behavior. For more information about KEDA, visit the official documentation.

Next Steps

To implement more solutions on your VKE cluster, visit the following resources: