How to Use Kafka Streams for Stateful and Stateless Data Processing

Updated on November 23, 2022
How to Use Kafka Streams for Stateful and Stateless Data Processing header image

Introduction

Kafka Streams is a Java library for building stream processing applications with Apache Kafka. It can be used or embedded within Java applications to process streaming data in Kafka topics. It's a standalone library that only depends on Kafka and uses it as the foundation for high availability and reliability.

Kafka Streams has two types of APIs:

  1. Streams DSL - A high-level API
  2. Processor - A low-level API

This guide covers the high-level Streams DSL API, which provides a functional programming model to write stream processing topologies concisely with a few lines of code. The Streams DSL API offers many abstractions such as KStreams, KTable etc.

One way of breaking it down is to categorize the functionality offered by these APIs as follows:

  1. Stateless operations
  2. Stateful operations

This guide will include code examples to demonstrate stateless operations such as map and filter, and stateful computations like aggregate and count.

Kafka Streams Stateless functions

The following KStream methods have been covered in this section:

  • map
  • filter
  • groupBy and groupByKey
  • through and to
  • print and peek
  • merge

map

map can transform individual records into a KStream by applying a function. It can be used to transform both key and value. If you only want to transform the value, use mapValues method. The flatMap method can return multiple records (KeyValue).

Let's look at a few examples.

map can be used to convert the key and value of each KStream record to lowercase String:

KStream<String, String> words = builder.stream("words");
words.map(new KeyValueMapper<String, String, KeyValue<String, String>>() {
    @Override
    public KeyValue<String, String> apply(String key, String val) {
            return new KeyValue<>(key.toLowerCase(), val.toLowerCase());
        }
    });

Or, just use mapValues to work with values only:

words.mapValues(new ValueMapper<String, String>() {
    @Override
    public String apply(String val) {
            return val.toLowerCase();
        }
    });

Using flatMapValues, you can further break down a value into a collection of values.

stream.flatMap(new KeyValueMapper<String, String, Iterable<? extends KeyValue<? extends String, ? extends String>>>() {
    @Override
    public Iterable<? extends KeyValue<? extends String, ? extends String>> apply(String key, String val) {
        String[] values = val.split(",");
        return Arrays.asList(values)
                    .stream()
                    .map(value -> new KeyValue<>(key, value))
                    .collect(Collectors.toList());
            }
    })

filter

Generally, the filter method only includes records in a KStream that fulfill a specific criterion. It is complemented by the filterNot method that is used to exclude records. In both cases, the filtration criterion is defined using a Predicate object.

For example, to only process transactions of a specific credit card type:

KStream<String, String> transactions = builder.stream("user-transactions");
transactions.filter(new Predicate<String, Transaction>() {
    @Override
    public boolean test(String userID, Transaction tx) {
            return tx.cardType().equals("VISA");
        }
    })

To ignore/exclude all users who haven't set their password:

KStream<String, String> users = builder.stream("users");
users.filterNot(new Predicate<String, User>() {
    @Override
    public boolean test(String userID, User user) {
            return user.isPwdSet();
        }
    })

group

Grouping operations are often used to convert the contents of a KStream to a KGroupedStream to perform stateful computations (covered later in this guide). This can be achieved using groupByKey or a more generic group method.

While using groupByKey is straightforward, note that a KeyValueMapper can be used with groupBy to use a different key. For example, you can use it to group user transactions based on card type:

KStream<String, User> transactions = builder.stream("transactions");

KGroupedStream<String, String> grouped = transactions.groupBy(new KeyValueMapper<String, User, String>() {
            @Override
            public String apply(String txID, User user) {
                return user.getCardType();
            }
    });

to and through

The to method is different from some of the other operations you've encountered so far. It's quite simple, yet very powerful since it lets us materialize (store) KStream records to another topic in Kafka - just with a simple method call!

It returns a void result instead of a KStream (or KTable) - such types of operations are also known as Terminal methods.

In this example, all the lowercase words are sent to a topic named lowercase-words after being transformed using mapValues operation (which was covered earlier).

KStream<String, String> words = builder.stream("words");

words.mapValues(new ValueMapper<String, String>() {
    @Override
    public String apply(String val) {
            return val.toLowerCase();
        }
    })
    .to("lowercase-words");

through is another simple yet powerful operation that is often used to complement the to method while building streaming pipelines. To continue the above example - Say, after storing all the lowercase words in a topic, you need to remove all the words that have a specific character (e.g. a hyphen -) and store the final results in another Kafka topic. Instead of using to method and creating a new KStream from the topic lowercase-words, it's possible to simplify the code like this:

KStream<String, String> words = builder.stream("words");

words.mapValues(new ValueMapper<String, String>() {
    @Override
    public String apply(String val) {
            return val.toLowerCase();
        }
    })
    .through("lowercase-words")
    .filter(new Predicate<String, String>() {
        @Override
        public boolean test(String k, String v) {
                return v.contains("-");
            }
        })
    .to("processed-words");

If you want to log the KStream records (for debugging purposes), print is a handy method (it's also a terminal operation, just like to). It's also possible to configure the behavior of this method using a Printed object (that print accepts).

For example, to log the values of a KStream to the standard out terminal:

KStream<String, String> words = builder.stream("words");

words.mapValues(new ValueMapper<String, String>() {
    @Override
    public String apply(String val) {
            return val.toLowerCase();
        }
    })
    .print(Printed.withLabel("demo").toSysOut());

The peek method is similar to print in terms of functionality, but it's not a terminal operation. Instead, it allows the caller to use a ForeachAction to define the specific action and returns the same KStream instance.

In this example, we simply log the key and value to standard out:

KStream<String, String> words = builder.stream("words");

words.mapValues(new ValueMapper<String, String>() {
    @Override
    public String apply(String val) {
            return val.toLowerCase();
        }
    })
    .peek(new ForeachAction<String, String>() {
        @Override
        public void apply(String k, String v) {
                System.out.println("key is "+k+", value is "+v);
            }
        })
    .to("lowercase-words");

merge

If you have two streams and need to combine them, use merge.

KStream<String, String> fte = builder.stream("fte");
KStream<String, String> contractor = builder.stream("contractors");

fte.merge(contractor).to("all-employees");

Kafka Streams Stateful functions

This section will cover aggregation operations (aggregate, count and reduce) along with an overview of Windowing in Kafka Streams. A side-effect of all these operations is "state" (hence the name Stateful operations), and it's important to understand where it's stored and how it's managed.

The state associated with these operations is stored in local "state stores" - either in-memory or on disk. The "data locality" makes the processing much more efficient. You can also configure your application such that this state store data is also sent Kafka topics. This is important for high availability and fault tolerance since the data can be restored from Kafka in case of application issues or crashes.

Let's go over some of these stateful operations.

count

KGroupedStream supports this operation. It makes it convenient to count the number of records of a specific key by using this operation using a single method.

Continuing with the groupBy example presented earlier. Once we group the transactions by card type, we can simply use count to get the number of transactions for each card type.

KStream<String, User> transactions = builder.stream("transactions");

KGroupedStream<String, String> grouped = transactions.groupBy(new KeyValueMapper<String, User, String>() {
            @Override
            public String apply(String txID, User user) {
                return user.getCardType();
            }
    });

KTable<String, Long> txPerCardType = grouped.count();

In order to store this state (count) locally, count accepts an instance of Materialized, which can be used as follows:

KTable<String, Long> txPerCardType = grouped.count(Materialized.as("tx-per-card-type"));

aggregate

aggregate comes in handy when executing calculations like moving averages over a streaming data set. This requires the state to be handled and has to take into account the current value and the computed aggregate's current value.

A good way to understand aggregate is to actually use it to implement count operation. When the first record is received, the Initializer is used to initialize the state (in this example, the count is set to zero) and invoked with the first record. After that, the Aggregator takes over - In this example, whenever a record is received, the current count is incremented by one.

    KStream<String, String> stream = builder.stream("transactions");

    KTable<String, Result> aggregate = stream.groupByKey()
            .aggregate(new Initializer<Result>() {
                @Override
                public Result apply() {
                    return new Result("", 0);
                }
            }, new Aggregator<String, String, Result>() {
                @Override
                public Result apply(String k, String v, Result count) {
                    Integer currentCount = count.getCount();
                    return new Result(k, currentCount + 1);
                }
            });

reduce

reduce operation can be used to combine streams of values and implement sum, min, max etc. You can think of aggregate operation as a generic version of reduce.

Windowing with Kafka Streams

For example, a common requirement for website analytics is to have metrics about the number of unique page views per hour, clicks per minute, etc. Windowing lets you confine the stream processing operations to execute within a time range.

Supported time windows include: sliding, tumbling, hopping, and session-based time windows.

For counting unique page views per hour, you can use a tumbling time window of 60 minutes. Thus, page views for a product from 1 PM to 2 PM will be aggregated and a fresh time block will start after that. Here is an example of how you might achieve this:

KStream<Product, Long> views = builder.stream("product-views");

views.groupByKey()
    .windowedBy(SessionWindows.with(Duration.ofMinutes(60)))
    .toStream()
    .to("views-per-hour");

Conclusion

This guide provided an introduction to Kafka Streams and the type of APIs. This was followed by coverage of commonly used Stateless and Stateful operations, along with examples. You can refer to the Kafka Streams Javadocs and Kafka documentation for further reading.