Scale RabbitMQ applications on Vultr Kubernetes Engine with KEDA

Updated on June 22, 2024
Scale RabbitMQ applications on Vultr Kubernetes Engine with KEDA header image

Introduction

KEDA is a Kubernetes-based Event Driven Autoscaler that handles scaling operations of any container workload in a cluster based on the number of events that need to be processed. It works with standard Kubernetes components such as Horizontal Pod Autoscaler (HPA) that allow you to map the applications you want to scale in an event-driven format. KEDA scalers are components that monitor external systems to generate metrics and drive the scaling of Kubernetes workloads.

A KEDA RabbitMQ scaler can scale workloads based on the length of a RabbitMQ queue. This article demonstrates how to use the RabbitMQ scaler on a Vultr Kubernetes Engine (VKE) cluster to allow RabbitMQ consumer application Pods to scale up and down based on the number of unprocessed items in the queue.

Prerequisites

Before you begin:

Note
Ensure that your Kubernetes cluster nodes have atleast 2 vCPUs and 4096 MB memory each. Verify that your VKE deployment region supports Vultr Block Storage.

Install the KEDA Operator

  1. Create a new project files directory.

    console
    $ mkdir vke-keda-rabbitmq
    
  2. Switch to the directory.

    console
    $ cd vke-keda-rabbitmq
    
  3. Deploy the latest KEDA operator version to your cluster using Kubectl.

    console
    $ kubectl apply --server-side -f https://github.com/kedacore/keda/releases/download/v2.12.1/keda-2.12.1-core.yaml
    

    The above command installs the operator version 2.12.1 to your cluster. To use the latest version, visit the KEDA releases page.

    Output:

    customresourcedefinition.apiextensions.k8s.io/clustertriggerauthentications.keda.sh serverside-applied
    customresourcedefinition.apiextensions.k8s.io/scaledjobs.keda.sh serverside-applied
    customresourcedefinition.apiextensions.k8s.io/scaledobjects.keda.sh serverside-applied
    customresourcedefinition.apiextensions.k8s.io/triggerauthentications.keda.sh serverside-applied
    ...
  4. Wait for at least 1 minute, then, view all deployments in the new keda namespace to verify that the operator is installed in your cluster.

    console
    $ kubectl get deployment -n keda
    

    Output:

    NAME                     READY   UP-TO-DATE   AVAILABLE   AGE
    keda-metrics-apiserver   1/1     1            1           30s
    keda-operator            1/1     1            1           40s

Install RabbitMQ

  1. Add the Bitnami chart to your local Helm repositories.

    console
    $ helm repo add bitnami https://charts.bitnami.com/bitnami
    
  2. Update your Helm repositories.

    console
    $ helm repo update
    
  3. Deploy RabbitMQ to your cluster.

    console
    $ helm install rabbitmq --set auth.username=user --set auth.password=s3cr3t --set persistence.storageClass=vultr-block-storage --set persistence.size=10Gi bitnami/rabbitmq
    

    RabbitMQ installs to your cluster as a Kubernetes StatefulSet. The above command above uses vultr-block-storage as the storage class to use with RabbitMQ for data persistence. When successful, navigate to your VKE Cluster Linked Resources tab and verify the attached block storage volumes.

  4. Wait for at least 3 minutes for the installation to complete, then, verify that all RabbitMQ Pods are available and running in your cluster.

    console
    $ kubectl get pods -l=app.kubernetes.io/instance=rabbitmq
    

    Output:

    NAME         READY   STATUS    RESTARTS   AGE
    rabbitmq-0   1/1     Running   0          2m28s

Create the RabbitMQ Producer Application

  1. Initialize a new Go module to create a go.mod file.

    console
    $ go mod init rabbitmq-app
    
  2. Using a text editor such as Nano, create a new file producer.go.

    console
    $ nano producer.go
    
  3. Add the following contents to the file.

    go
    package main
    
    import (
        "context"
        "fmt"
        "log"
        "os"
        "strconv"
        "time"
    
        amqp "github.com/rabbitmq/amqp091-go"
    )
    
    var conn *amqp.Connection
    var queueName string
    
    func init() {
        url := os.Getenv("RABBITMQ_URI")
        if url == "" {
            log.Fatal("missing environment variable RABBITMQ_URI")
        }
    
        queueName = os.Getenv("RABBITMQ_QUEUE_NAME")
        if queueName == "" {
            log.Fatal("missing environment variable RABBITMQ_QUEUE_NAME")
        }
    
        var err error
    
        conn, err = amqp.Dial(url)
        if err != nil {
            log.Fatal("dial failed ", err)
        }
    }
    
    func main() {
    
        defer conn.Close()
    
        ch, err := conn.Channel()
        if err != nil {
            log.Fatalf("Failed to create channel: %v", err)
        }
        defer ch.Close()
    
        q, err := ch.QueueDeclare(
            queueName,
            true,
            false,
            false,
            false,
            nil,
        )
        if err != nil {
            log.Fatalf("Failed to declare queue: %v", err)
        }
    
        for i := 0; i <= 1000000; i++ {
            _i := strconv.Itoa(i)
    
            msg := "message-" + _i
    
            err = ch.PublishWithContext(
                context.Background(),
                "",
                q.Name,
                false,
                false,
                amqp.Publishing{
                    ContentType: "text/plain",
                    Body:        []byte(msg),
                },
            )
            if err != nil {
                log.Fatalf("failed to publish message: %v", err)
            }
    
            fmt.Println("message", msg, "sent to queue", q.Name)
            time.Sleep(1 * time.Second)
        }
    }
    

    Save and close the file.

    The above configuration uses the RabbitMQ connection string and queue name from the available environment variables to establish a connection to RabbitMQ. Then, it uses a for loop to send messages to the queue with a wait duration of 1 second between each message.

Containerize the Producer Application

  1. Create a new file Dockerfile.producer.

    console
    $ nano Dockerfile.producer
    
  2. Add the following contents to the file.

    dockerfile
    FROM golang AS build
    
    WORKDIR /app
    COPY go.mod ./
    COPY go.sum ./
    
    RUN go mod download
    
    COPY producer.go ./
    RUN CGO_ENABLED=1 go build -o /rabbitmq-go-app
    
    FROM cgr.dev/chainguard/glibc-dynamic
    WORKDIR /
    COPY --from=build /rabbitmq-go-app /rabbitmq-go-app
    EXPOSE 8080
    USER nonroot:nonroot
    ENTRYPOINT ["/rabbitmq-go-app"]
    

    Save and close the file.

    The above Dockerfile uses golang as the base image for the first stage and builds the producer program binary. The file uses cgr.dev/chainguard/glibc-dynamic as the base image for the second stage to copy the binary produced by the first stage.

Deploy the RabbitMQ Producer Application

  1. Fetch the application Go module dependencies to create go.sum file.

    console
    $ go mod tidy
    
  2. Login to your Docker Hub account.

    console
    $ docker login
    

    When prompted, enter your Docker Hub username and password to use it as your default registry. However, you can also use the Vultr Container Registry to build and push images for deployment in your cluster.

  3. Build the application image. Replace exampleuser with your actual DockerHub username.

    console
    $ docker build -t exampleuser/rabbitmq-producer-app -f Dockerfile.producer .
    
  4. Push the image to your DockerHub profile.

    console
    $ docker push exampleuser/rabbitmq-producer-app
    
  5. Create a new file producer.yaml.

    console
    $ nano producer.yaml
    
  6. Add the following contents to the file. Replace exampleuser with your Docker Hub username.

    yaml
    apiVersion: apps/v1
    kind: Deployment
    metadata:
      name: rabbitmq-producer
    spec:
      replicas: 1
      selector:
        matchLabels:
          app: rabbitmq-producer
      template:
        metadata:
          labels:
            app: rabbitmq-producer
        spec:
          containers:
            - name: rabbitmq-producer
              image: exampleuser/rabbitmq-producer-app
              imagePullPolicy: Always
              env:
                - name: RABBITMQ_QUEUE_NAME
                  value: demo-queue
                - name: RABBITMQ_URI
                  value: amqp://user:s3cr3t@rabbitmq.default.svc.cluster.local:5672
    

    Save and close the file.

  7. Deploy the producer application to your cluster.

    console
    $ kubectl apply -f producer.yaml
    

    Output:

    deployment.apps/rabbitmq-producer created
  8. Wait for at least 20 seconds and view the rabbitmq-producer pods to verify that the new application Pod is Running.

    console
    $ kubectl get pods -l=app=rabbitmq-producer
    

    Your output should look like the one below:

    NAME                                 READY   STATUS              RESTARTS   AGE
    rabbitmq-producer-847f6866c5-tpxdx   1/1     Running             0          10s
  9. View the producer application logs to verify the queue messages progress.

    console
    $ kubectl logs -f $(kubectl get pod -l=app=rabbitmq-producer -o jsonpath='{.items[0].metadata.name}')
    

    Output:

    message message-0 sent to queue demo-queue
    message message-1 sent to queue demo-queue
    message message-2 sent to queue demo-queue
    message message-3 sent to queue demo-queue
    ....

    The above logs show that the messages are successfully sent to the demo-queue RabbitMQ queue.

Create the RabbitMQ Consumer Application

  1. Create a new file consumer.go.

    console
    $ nano consumer.go
    
  2. Add the following contents to the file.

    go
    package main
    
    import (
        "fmt"
        "log"
        "os"
        "time"
    
        "github.com/google/uuid"
        amqp "github.com/rabbitmq/amqp091-go"
    )
    
    var conn *amqp.Connection
    var queueName string
    var instanceName string
    
    func init() {
    
        url := os.Getenv("RABBITMQ_URI")
        if url == "" {
            log.Fatal("missing environment variable RABBITMQ_URI")
        }
    
        queueName = os.Getenv("RABBITMQ_QUEUE_NAME")
        if queueName == "" {
            log.Fatal("missing environment variable RABBITMQ_QUEUE_NAME")
        }
    
        var err error
    
        conn, err = amqp.Dial(url)
        if err != nil {
            log.Fatal(err)
        }
    
        instanceName = os.Getenv("INSTANCE_NAME")
        if instanceName == "" {
            instanceName = "rabbitmq-consumer-" + uuid.NewString()
        }
    
    }
    
    func main() {
    
        defer conn.Close()
    
        ch, err := conn.Channel()
        if err != nil {
            log.Fatalf("Failed to create channel: %v", err)
        }
        defer ch.Close()
    
        q, err := ch.QueueDeclare(
            queueName,
            true,
            false,
            false,
            false,
            nil,
        )
        if err != nil {
            log.Fatalf("Failed to declare queue: %v", err)
        }
    
        err = ch.Qos(
            1,
            0,
            false,
        )
    
        msgs, err := ch.Consume(
            q.Name,
            "",
            false,
            false,
            false,
            false,
            nil,
        )
        if err != nil {
            log.Fatalf("failed to consume messages from queue: %v", err)
        }
    
        fmt.Println("consumer instance", instanceName, "waiting for messages.....")
    
        for msg := range msgs {
            fmt.Println("Instance", instanceName, "received message", string(msg.Body), "from queue", q.Name)
            msg.Ack(false)
            time.Sleep(3 * time.Second)
        }
    }
    

    Save and close the file.

    The above configuration uses the RabbitMQ connection string and queue name environment variables to establish a connection to RabbitMQ. It detects the Pod instance name using the INSTANCE_NAME environment variable. Then, it declares a queue and uses a for loop to receive messages. In addition, all messages are logged to the console and acknowledged.

  3. To containerize the application for deployment, create a new Dockerfile Dockerfile.consumer.

    console
    $ nano Dockerfile.consumer
    
  4. Add the following contents to the file.

    dockerfile
    FROM golang AS build
    
    WORKDIR /app
    COPY go.mod ./
    COPY go.sum ./
    
    RUN go mod download
    
    COPY consumer.go ./
    RUN CGO_ENABLED=1 go build -o /rabbitmq-go-app
    
    FROM cgr.dev/chainguard/glibc-dynamic
    WORKDIR /
    COPY --from=build /rabbitmq-go-app /rabbitmq-go-app
    EXPOSE 8080
    USER nonroot:nonroot
    ENTRYPOINT ["/rabbitmq-go-app"]
    

    Save and close the file.

    The above Dockerfile uses two-stage build process which includes golang that works as a base image for the first stage and builds the consumer program binary. Then, cgr.dev/chainguard/glibc-dynamic is the base image for the second stage that copies the binary produced by the first stage.

Deploy the RabbitMQ Consumer Application

  1. Fetch the application Go module dependencies.

    console
    $ go mod tidy
    
  2. Build the application image. Replace exampleuser with your actual Docker Hub username.

    console
    $ docker build -t exampleuser/rabbitmq-consumer-app -f Dockerfile.consumer .
    
  3. Push the application image to your new Docker Hub profile.

    console
    $ docker push exampleuser/rabbitmq-consumer-app
    
  4. Create a new file consumer.yaml.

    console
    $ nano consumer.yaml
    
  5. Add the following contents to the file. Replace exampleuser with your Docker Hub username.

    yaml
    apiVersion: apps/v1
    kind: Deployment
    metadata:
      name: rabbitmq-consumer
    spec:
      replicas: 1
      selector:
        matchLabels:
          app: rabbitmq-consumer
      template:
        metadata:
          labels:
            app: rabbitmq-consumer
        spec:
          containers:
            - name: rabbitmq-consumer
              image: exampleuser/rabbitmq-consumer-app
              imagePullPolicy: Always
              env:
                - name: RABBITMQ_QUEUE_NAME
                  value: demo-queue
                - name: RABBITMQ_URI
                  value: amqp://user:s3cr3t@rabbitmq.default.svc.cluster.local:5672
                - name: INSTANCE_NAME
                  valueFrom:
                    fieldRef:
                      fieldPath: metadata.name
    

    Save and close the file.

  6. Deploy the consumer application to your cluster.

    console
    $ kubectl apply -f consumer.yaml
    

    Output:

    deployment.apps/rabbitmq-consumer created
  7. Wait for at least 20 seconds and verify the status of running rabbitmq-consumer application pods.

    console
    $ kubectl get pods -l=app=rabbitmq-consumer
    

    Your output should look like the one below:

    NAME                                 READY   STATUS              RESTARTS   AGE
    rabbitmq-consumer-5b8884c78b-2sjds   1/1     Running             0          10s

Create the KEDA Scaler

  1. Create a new YAML manifest scaler.yaml.

    console
    $ nano scaler.yaml
    
  2. Add the following contents to the file.

    yaml
    apiVersion: keda.sh/v1alpha1
    kind: ScaledObject
    metadata:
      name: rabbitmq-scaledobject
      namespace: default
    spec:
      scaleTargetRef:
        name: rabbitmq-consumer
      minReplicaCount: 1
      maxReplicaCount: 5
      triggers:
        - type: rabbitmq
          metadata:
            protocol: amqp
            queueName: demo-queue
            mode: QueueLength
            value: "10"
            hostFromEnv: RABBITMQ_URI
    

    Save and close the file.

  3. Install the Scaler to your cluster.

    console
    $ kubectl apply -f scaler.yaml
    

    Output:

    scaledobject.keda.sh/rabbitmq-scaledobject created

Verify the Application Autoscaling

  1. View the RabbitMQ consumer application deployment status and verify that READY field includes an upscaled number of pods.

    console
    $ kubectl get deployment/rabbitmq-consumer
    

    Your output should look like the one below:

    NAME                READY   UP-TO-DATE   AVAILABLE   AGE
    rabbitmq-consumer   5/5     5            5           53s
  2. View the consumer application Pods and verify the number of available resources.

    console
    $ kubectl get pods -l=app=rabbitmq-consumer
    

    Output:

    NAME                                 READY   STATUS    RESTARTS   AGE
    rabbitmq-consumer-5b8884c78b-7mhg7   1/1     Running   0          98s
    rabbitmq-consumer-5b8884c78b-ktjf9   1/1     Running   0          83s
    rabbitmq-consumer-5b8884c78b-vx7wv   1/1     Running   0          98s
    rabbitmq-consumer-5b8884c78b-xc2rh   1/1     Running   0          3m25s
    rabbitmq-consumer-5b8884c78b-xfqhr   1/1     Running   0          98s

    The RabbitMQ queue demo-queue is monitored by the KEDA scaler. When the unprocessed message count exceeds 10, the rabbitmq-consumer deployment automatically scales out. In the above output, only five Pods are available because the maxReplicaCount attribute in the ScaledObject configuration is set to 5. When the consumer pods stop processing messages from the RabbitMQ queue, KEDA scales the deployment back to a single pod.

  3. View the consumer application logs using a RabbitMQ Pod of your choice. For example: rabbitmq-consumer-5b8884c78b-xfqhr.

    console
    $ kubectl logs -f rabbitmq-consumer-5b8884c78b-xfqhr
    

    Output:

    Instance rabbitmq-consumer-5b8884c78b-7mhg7 received message message-120 from queue demo-queue
    Instance rabbitmq-consumer-5b8884c78b-7mhg7 received message message-125 from queue demo-queue
    Instance rabbitmq-consumer-5b8884c78b-7mhg7 received message message-130 from queue demo-queue
    Instance rabbitmq-consumer-5b8884c78b-7mhg7 received message message-135 from queue demo-queue
    Instance rabbitmq-consumer-5b8884c78b-7mhg7 received message message-140 from queue demo-queue
    ....

    As displayed in the above pod output, the consumer application has five replicas, and the message processing is load balanced amongst them. This is evident in the log message which logs the pod name and the message processed by it. If you view other pod logs, they include different processing information since a different set of messages is sent by the producer. This enables horizontal scalability of the consumer application.

Conclusion

You have configured KEDA to scale RabbitMQ consumer application deployed in your VKE cluster. In addition, you set up KEDA and RabbitMQ using client applications (producer and consumer) to verify the auto-scaling behavior. You can apply auto scaling with different applications in your cluster using KEDA. For more information, please visit the official documentation.

Next Steps

To implement more solutions in your cluster, visit the following resources: