
Introduction
In this article, you will learn how to run Apache Kafka on Kubernetes using the open-source Strimzi project. You will setup Strizmi on Vultr Kubernetes Engine, setup a Kafka cluster, sent messages to a topic and recieved messages from that topic. You will also secure the Kafka cluster using encryption and user authentication.
Introduction to Apache Kafka
Apache Kafka is a messaging system that allows clients to publish and read streams of data (also called events). It has an ecosystem of open-source solutions that you can combine to store, process, and integrate these data streams with other parts of your system in a secure, reliable, and scalable manner.
Key components of Apache Kafka:
- Broker (Node): A Kafka broker runs the Kafka JVM process. A best practice is to run three or more brokers for scalability and high availability. These groups of Kafka brokers form a cluster.
- Producers: These are client applications that send messages to Kafka. Each message is nothing but a key-value pair.
- Topics: Events (messages) are stored in topics, and each topic has one or more partitions. Data in each of these partitions are distributed across the Kafka cluster for high availability and redundancy.
- Consumers: Just like producers, consumers are also client applications. They receive and process data/events from Kafka topics.
Introduction to Strimzi
Strimzi can be used to run an Apache Kafka cluster on Kubernetes. In addition to the cluster itself, Strimzi can also help you manage topics, users, Mirror Maker and Kafka Connect deployments. With Strimzi, you can configure the cluster as per your needs. This includes advanced features such as rack awareness configuration to distribute Kafka nodes across availability zones, as well as Kubernetes taints and tolerations to pin Kafka to dedicated worker nodes in your Kubernetes cluster. You can also expose Kafka to external clients outside the Kubernetes cluster using Service types such as NodePort, LoadBalancer etc. and these can be secured using SSL.
All this is made possible with a combination of Custom resources, Operators and respective Docker container images.
Strimzi Custom Resources and Operators
You can customize Strimzi Kafka components in a Kubernetes cluster using custom resources. These are created as instances of APIs added by Custom resource definitions (CRDs) that extend Kubernetes resources. Each Strimzi component has an associated CRD which is used to describe that component. Thanks to CRDs, Strimzi resources benefit from Kubernetes features such as CLI accessibility and configuration validation.
Once a Strimzi custom resource is created, it's managed using Operators. Operators are a method of packaging, deploying, and managing a Kubernetes-native application. Because Strimzi Operators automate common and complex tasks related to a Kafka deployment, Kafka administration tasks are simplified and require less manual intervention.
Let's look at the Strimzi operators and the custom resources they manage.
Cluster Operator
Strimzi Cluster Operator is used to deploy and manage Kafka components. Although a single Cluster Operator instance is deployed by default, you can add replicas with leader election to ensure operator high availability.
The Cluster Operator manages the following Kafka components:
- Kafka - The
Kafkaresource is used to configure a Kafka deployment. Configuration options for the ZooKeeper cluster also included within theKafkaresource. - KafkaConnector - This resources allow you to create and manage connector instances for Kafka Connect.
- KafkaMirrorMaker2 - It can be used to run and manage a Kafka MirrorMaker 2.0 deployment. MirrorMaker 2.0 replicates data between two or more Kafka clusters, within or across data centers.
- KafkaBridge - This recourse managed Kafka Bridge, which is component that provides an API for integrating HTTP-based clients with a Kafka cluster.
Entity Operator
Entity Operator comprises the Topic and User Operator.
- Topic Operator - It provides a way of managing topics in a Kafka cluster through Kubernetes resources.
You can declare a
KafkaTopicresource as part of your application’s deployment and the Topic Operator will take care of creating the topic for you and keeping them in-sync with corresponding Kafka topics. Information about each topic in a topic store, which is continually synchronized with updates from Kafka topics or KubernetesKafkaTopiccustom resources. If a topic is reconfigured or reassigned to other brokers, theKafkaTopicwill always be up to date. - User Operator - It allows you to declare a
KafkaUserresource as part of your application’s deployment along with authentication and authorization mechanisms for the user. You can also configure user quotas that control usage of Kafka resources. In addition to managing credentials for authentication, the User Operator also manages authorization rules by including a description of the user’s access rights in theKafkaUserdeclaration.
Prerequisites
Install kubectl on your local workstation. It is a Kubernetes command-line tool that allows you to run commands against Kubernetes clusters.
Deploy a Vultr Kubernetes Engine (VKE) cluster using the Reference Guide. Once it's deployed, from the Overview tab, click the Download Configuration button in the upper-right corner to download your
kubeconfigfile and save it to a local directory.Point
kubectlto Vultr Kubernetes Engine cluster by setting theKUBECONFIGenvironment variable to the path where you downloaded the clusterkubeconfigfile in the previous step.export KUBECONFIG=<path to VKE kubeconfig file>Verify the same using the following command:
kubectl config current-context
Install Strimzi on Vultr Kubernetes Engine
Create a namespace called
kafka.kubectl create namespace kafkaYou should see this output:
namespace/kafka createdApply the Strimzi installation files, including
ClusterRoles,ClusterRoleBindingsand Custom Resource Definitions (CRDs).kubectl create -f 'https://strimzi.io/install/latest?namespace=kafka' -n kafkaYou should see this output:
clusterrole.rbac.authorization.k8s.io/strimzi-kafka-broker created clusterrole.rbac.authorization.k8s.io/strimzi-cluster-operator-namespaced created customresourcedefinition.apiextensions.k8s.io/kafkamirrormaker2s.kafka.strimzi.io created rolebinding.rbac.authorization.k8s.io/strimzi-cluster-operator-leader-election created customresourcedefinition.apiextensions.k8s.io/kafkaconnectors.kafka.strimzi.io created customresourcedefinition.apiextensions.k8s.io/kafkabridges.kafka.strimzi.io created customresourcedefinition.apiextensions.k8s.io/kafkamirrormakers.kafka.strimzi.io created clusterrolebinding.rbac.authorization.k8s.io/strimzi-cluster-operator-kafka-broker-delegation created rolebinding.rbac.authorization.k8s.io/strimzi-cluster-operator-watched created clusterrolebinding.rbac.authorization.k8s.io/strimzi-cluster-operator created customresourcedefinition.apiextensions.k8s.io/kafkatopics.kafka.strimzi.io created customresourcedefinition.apiextensions.k8s.io/kafkaconnects.kafka.strimzi.io created deployment.apps/strimzi-cluster-operator created clusterrole.rbac.authorization.k8s.io/strimzi-cluster-operator-global created customresourcedefinition.apiextensions.k8s.io/kafkarebalances.kafka.strimzi.io created clusterrole.rbac.authorization.k8s.io/strimzi-cluster-operator-leader-election created clusterrolebinding.rbac.authorization.k8s.io/strimzi-cluster-operator-kafka-client-delegation created customresourcedefinition.apiextensions.k8s.io/kafkausers.kafka.strimzi.io created clusterrole.rbac.authorization.k8s.io/strimzi-cluster-operator-watched created clusterrole.rbac.authorization.k8s.io/strimzi-kafka-client created configmap/strimzi-cluster-operator created clusterrole.rbac.authorization.k8s.io/strimzi-entity-operator created rolebinding.rbac.authorization.k8s.io/strimzi-cluster-operator-entity-operator-delegation created rolebinding.rbac.authorization.k8s.io/strimzi-cluster-operator created serviceaccount/strimzi-cluster-operator created customresourcedefinition.apiextensions.k8s.io/strimzipodsets.core.strimzi.io created customresourcedefinition.apiextensions.k8s.io/kafkas.kafka.strimzi.io createdFollow the deployment of the Strimzi cluster operator and wait for the
Podto transition toRunningstatus.kubectl get pod -n kafka --watchYou should see this output (the
Podname might differ in your case):NAME READY STATUS RESTARTS AGE strimzi-cluster-operator-56d64c8584-7k6sr 1/1 Running 0 43sTo check the operator’s log:
kubectl logs deployment/strimzi-cluster-operator -n kafka -f
Setup Kafka cluster
Create a directory and switch to it:
mkdir vultr-vke-kafka cd vultr-vke-kafkaCreate a new file
kafka-cluster-1.yml:touch kafka-cluster-1.ymlAdd the below contents to
kafka-cluster-1.ymlfile and save it.apiVersion: kafka.strimzi.io/v1beta2 kind: Kafka metadata: name: my-cluster-1 spec: kafka: version: 3.3.1 replicas: 1 listeners: - name: plain port: 9092 type: internal tls: false - name: tls port: 9093 type: internal tls: true config: offsets.topic.replication.factor: 1 transaction.state.log.replication.factor: 1 transaction.state.log.min.isr: 1 default.replication.factor: 1 min.insync.replicas: 1 inter.broker.protocol.version: "3.3" storage: type: ephemeral zookeeper: replicas: 1 storage: type: ephemeral entityOperator: topicOperator: {} userOperator: {}Install the Kafka cluster:
kubectl apply -f kafka-cluster-1.yml -n kafkaYou should see this output:
kafka.kafka.strimzi.io/my-cluster-1 createdWait for cluster to be created.
kubectl wait kafka/my-cluster-1 --for=condition=Ready --timeout=300s -n kafkaOnce completed, you will see this output:
kafka.kafka.strimzi.io/my-cluster-1 condition metVerify Kafka cluster
kubectl get kafka -n kafka
You should see this output:
NAME DESIRED KAFKA REPLICAS DESIRED ZK REPLICAS READY WARNINGS
my-cluster-1 1 1 True TrueVerify Kafka
Podkubectl get pod/my-cluster-1-kafka-0 -n kafkaYou should see this output:
NAME READY STATUS RESTARTS AGE my-cluster-1-kafka-0 1/1 Running 0 9m23sVerify Zookeeper
Podkubectl get pod/my-cluster-1-zookeeper-0 -n kafkaYou should see this output:
NAME READY STATUS RESTARTS AGE my-cluster-1-zookeeper-0 1/1 Running 0 10mCheck the
ConfigMaps associated with the cluster:kubectl get configmap -n kafkaYou should see this output:
NAME DATA AGE kube-root-ca.crt 1 57m my-cluster-1-entity-topic-operator-config 1 9m20s my-cluster-1-entity-user-operator-config 1 9m20s my-cluster-1-kafka-0 3 9m45s my-cluster-1-zookeeper-config 2 10m strimzi-cluster-operator 1 57mCheck
Services associated with the cluster:kubectl get svc -n kafkaYou should see this output (the
ClusterIPs might differ in your case):NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE my-cluster-1-kafka-bootstrap ClusterIP 10.96.23.73 <none> 9091/TCP,9092/TCP,9093/TCP 10m my-cluster-1-kafka-brokers ClusterIP None <none> 9090/TCP,9091/TCP,9092/TCP,9093/TCP 10m my-cluster-1-zookeeper-client ClusterIP 10.108.246.28 <none> 2181/TCP 10m my-cluster-1-zookeeper-nodes ClusterIP None <none> 2181/TCP,2888/TCP,3888/TCP 10mCheck
Secrets associated with the cluster:kubectl get secret -n kafkaYou should see this output:
NAME TYPE DATA AGE my-cluster-1-clients-ca Opaque 1 10m my-cluster-1-clients-ca-cert Opaque 3 10m my-cluster-1-cluster-ca Opaque 1 10m my-cluster-1-cluster-ca-cert Opaque 3 10m my-cluster-1-cluster-operator-certs Opaque 4 10m my-cluster-1-entity-topic-operator-certs Opaque 4 9m44s my-cluster-1-entity-user-operator-certs Opaque 4 9m44s my-cluster-1-kafka-brokers Opaque 4 10m my-cluster-1-zookeeper-nodes Opaque 4 10m
You can test the Kafka cluster using the Kafka CLI based consumer and producer.
Verify cluster operation
You will verify cluster functionality by producing data using Kafka CLI producer and consuming data using Kafka CLI consumer.
Run a
Podto execute Kafka CLI producer and send data to a topickubectl -n kafka run kafka-producer -ti --image=quay.io/strimzi/kafka:0.32.0-kafka-3.3.1 --rm=true --restart=Never -- bin/kafka-console-producer.sh --bootstrap-server my-cluster-1-kafka-bootstrap:9092 --topic my-topicYou should see the following output with prompt
If you don't see a command prompt, try pressing enter. >Enter messages in the prompt. These will be send to the specified Kafka topic.
Open a new terminal. Point
kubectlto Vultr Kubernetes Engine cluster by setting theKUBECONFIGenvironment variable to the path where you downloaded the clusterkubeconfigfile.export KUBECONFIG=<path to VKE kubeconfig file>Run a
Podto execute Kafka CLI consumer to consume data from a topickubectl -n kafka run kafka-consumer -ti --image=quay.io/strimzi/kafka:0.32.0-kafka-3.3.1 --rm=true --restart=Never -- bin/kafka-console-consumer.sh --bootstrap-server my-cluster-1-kafka-bootstrap:9092 --topic my-topic --from-beginningYou should receive messages you sent from the producer terminal.
Press ctrl+c on each terminal to close them. This will delete both the
Pods.Delete the Kafka cluster
kubectl delete -f kafka-cluster-1.yml -n kafkaVerify that the associated
Pods were deleted. Wait formy-cluster-1-kafka-0andmy-cluster-1-zookeeper-0Pods to terminate.kubectl get pods -n kafka
Setup a secure Kafka cluster
So far, you have setup a simple Kafka cluster. In the next section, you will learn how to secure the setup by using the following:
- Encryption via TLS.
- Authentication via
SASL SCRAM.
Create a new file
kafka-cluster-2.yml:touch kafka-cluster-2.ymlAdd the below contents to
kafka-cluster-2.ymlfile and save it.apiVersion: kafka.strimzi.io/v1beta2 kind: Kafka metadata: name: my-cluster-2 spec: kafka: version: 3.3.1 replicas: 1 listeners: - name: plain port: 9092 type: internal tls: true authentication: type: scram-sha-512 - name: tls port: 9093 type: internal tls: true config: offsets.topic.replication.factor: 1 transaction.state.log.replication.factor: 1 transaction.state.log.min.isr: 1 default.replication.factor: 1 min.insync.replicas: 1 inter.broker.protocol.version: "3.3" storage: type: ephemeral zookeeper: replicas: 1 storage: type: ephemeral entityOperator: topicOperator: {} userOperator: {}Install the Kafka cluster:
kubectl apply -f kafka-cluster-2.yml -n kafkaYou should see this output:
kafka.kafka.strimzi.io/my-cluster-2 createdWait for cluster to be created.
kubectl wait kafka/my-cluster-2 --for=condition=Ready --timeout=300s -n kafkaOnce completed, you will see this output:
kafka.kafka.strimzi.io/my-cluster-2 condition metVerify Kafka cluster
kubectl get kafka -n kafkaYou should see this output:
NAME DESIRED KAFKA REPLICAS DESIRED ZK REPLICAS READY WARNINGS my-cluster-2 1 1 True TrueCreate a new file
kafka-user.yml:touch kafka-user.ymlAdd the below contents to
kafka-user.ymlfile and save it.apiVersion: kafka.strimzi.io/v1beta1 kind: KafkaUser metadata: name: test-kafka-user labels: strimzi.io/cluster: my-cluster-2 spec: authentication: type: scram-sha-512Create the
KafkaUserresourcekubectl apply -f kafka-user.yml -n kafkaYou should see this output:
kafkauser.kafka.strimzi.io/test-kafka-user createdVerify user creation
kubectl get kafkauser -n kafkaYou should see this output:
NAME CLUSTER AUTHENTICATION AUTHORIZATION READY test-kafka-user my-cluster-2 scram-sha-512 TrueWhen the user is created, the User Operator creates a Kubernetes
Secretand seeds it with the user credentials required to authenticate to the Kafka cluster.Verify the
Secretkubectl get secret/test-kafka-user -n kafka -o yaml
Verify cluster operation
You will verify cluster functionality by producing data using Kafka CLI producer and consuming data using Kafka CLI consumer.
- The CLI clients will connect to the Kafka broker using SSL.
- The CLI clients will need to authenticate to the Kafka broker using username and password.
Send data to Kafka topic
Fetch the password for the Kafka user that you had created and save it to your local workstation.
kubectl get secret test-kafka-user -n kafka -o jsonpath='{.data.password}' | base64 --decode > user.passwordFetch the Kafka server certificate and save it to your local workstation.
kubectl get secret my-cluster-2-cluster-ca-cert -o jsonpath='{.data.ca\.crt}' -n kafka | base64 --decode > ca.p12Fetch the Kafka server certificate password and save it to your local workstation.
kubectl get secret my-cluster-2-cluster-ca-cert -o jsonpath='{.data.ca\.password}' -n kafka | base64 --decode > ca.passwordOpen a new terminal. Point
kubectlto Vultr Kubernetes Engine cluster by setting theKUBECONFIGenvironment variable to the path where you downloaded the clusterkubeconfigfile.export KUBECONFIG=<path to VKE kubeconfig file>Start a new
Podnamekafka-producerkubectl -n kafka run kafka-producer -ti --image=quay.io/strimzi/kafka:0.32.0-kafka-3.3.1 --rm=true --restart=NeverYou should see a shell prompt after the
Podstarts[kafka@kafka-producer kafka]$From the previous terminal, copy the local certificate into the
kafka-producerPodthat you just started:kubectl cp ca.p12 kafka-producer:/tmp -n kafkaGo back to the terminal where the
kafka-producerPodis running and execute the below commandscp $JAVA_HOME/lib/security/cacerts /tmp/cacerts chmod 777 /tmp/cacertsImport the server CA certificate in to the keystore. For
keypass, use the password you had saved to your localca.passwordfilekeytool -importcert -alias strimzi-kafka-cert -file /tmp/ca.p12 -keystore /tmp/cacerts -keypass <password from ca.password file> -storepass changeit -nopromptYou should see this output
Certificate was added to keystoreCreate the configuration file which will be used by the Kafka CLI producer. For
password, use the password you had saved to your localuser.passwordfilecat > /tmp/producer.properties << EOF security.protocol=SASL_SSL sasl.mechanism=SCRAM-SHA-512 sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="test-kafka-user" password="<password from user.password file>"; ssl.truststore.location=/tmp/cacerts ssl.truststore.password=changeit EOFSend data to a topic
bin/kafka-console-producer.sh --bootstrap-server my-cluster-2-kafka-bootstrap:9092 --topic my-topic --producer.config /tmp/producer.propertiesYou should see the following output with prompt
If you don't see a command prompt, try pressing enter. >Enter messages in the prompt. These will be send to the specified Kafka topic.
Receive data from Kafka topic
Open a new terminal. Point
kubectlto Vultr Kubernetes Engine cluster by setting theKUBECONFIGenvironment variable to the path where you downloaded the clusterkubeconfig.export KUBECONFIG=<path to VKE kubeconfig file>Start a new
Podnamekafka-consumerkubectl -n kafka run kafka-consumer -ti --image=quay.io/strimzi/kafka:0.32.0-kafka-3.3.1 --rm=true --restart=NeverYou should see a shell prompt after the
Podstarts[kafka@kafka-consumer kafka]$From the previous terminal, copy the local certificate into the
kafka-consumerPodthat you just started:kubectl cp ca.p12 kafka-consumer:/tmp -n kafkaGo back to the terminal where the
kafka-consumerPodis running and execute the below commands tocp $JAVA_HOME/lib/security/cacerts /tmp/cacerts chmod 777 /tmp/cacertsImport the server CA certificate in to the keystore. For
keypass, use the password you had saved to your localca.passwordfilekeytool -importcert -alias strimzi-kafka-cert -file /tmp/ca.p12 -keystore /tmp/cacerts -keypass <password from ca.password file> -storepass changeit -nopromptYou should see this output
Certificate was added to keystoreCreate the configuration file which will be used by the Kafka CLI consumer. For
password, use the password you had saved to your localuser.passwordfilecat > /tmp/consumer.properties << EOF security.protocol=SASL_SSL sasl.mechanism=SCRAM-SHA-512 sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="test-kafka-user" password="jCtF8vhh23Lu"; ssl.truststore.location=/tmp/cacerts ssl.truststore.password=changeit EOFUse Kafka CLI consumer to consume data from the topic
bin/kafka-console-consumer.sh --bootstrap-server my-cluster-2-kafka-bootstrap:9092 --topic my-topic --consumer.config /tmp/consumer.properties --from-beginningYou should receive messages you sent from the producer terminal.
From a new terminal, delete the Kafka cluster
kubectl delete -f kafka-cluster-2.yml -n kafkaVerify that the associated
Pods were deleted. Wait formy-cluster-2-kafka-0andmy-cluster-2-zookeeper-0Pods to terminate.kubectl get pods -n kafka
Delete Vultr Kubernetes Engine cluster
After you have completed the tutorial in this article, you can delete the Vultr Kubernetes Engine cluster.
Conclusion
In this article, you learnt how to use Strimzi to run Kafka and its related components on Kubernetes. You installed Strizmi on Vultr Kubernetes Engine, setup a Kafka cluster, sent messages to a topic and recieved messages from that topic. Next, you secured the Kafka cluster by enforcing TLS encryption as well as SASL authentication. TLS encryption ensured that the clients could only connect via SSL and with SASL authentication, clients had to specify the username and password to interact with the cluster (send or receive data).
You can also learn more in the following documentation: