How to Use Kafka Streams for Stateful and Stateless Data Processing
Introduction
Kafka Streams is a Java library for building stream processing applications with Apache Kafka. It can be used or embedded within Java applications to process streaming data in Kafka topics. It's a standalone library that only depends on Kafka and uses it as the foundation for high availability and reliability.
Kafka Streams has two types of APIs:
- Streams DSL - A high-level API
- Processor - A low-level API
This guide covers the high-level Streams DSL API, which provides a functional programming model to write stream processing topologies concisely with a few lines of code. The Streams DSL API offers many abstractions such as KStreams
, KTable
etc.
One way of breaking it down is to categorize the functionality offered by these APIs as follows:
- Stateless operations
- Stateful operations
This guide will include code examples to demonstrate stateless operations such as map
and filter
, and stateful computations like aggregate
and count
.
Kafka Streams Stateless functions
The following KStream
methods have been covered in this section:
- map
- filter
- groupBy and groupByKey
- through and to
- print and peek
- merge
map
map
can transform individual records into a KStream
by applying a function. It can be used to transform both key and value. If you only want to transform the value, use mapValues
method. The flatMap
method can return multiple records (KeyValue
).
Let's look at a few examples.
map
can be used to convert the key and value of each KStream
record to lowercase String
:
KStream<String, String> words = builder.stream("words");
words.map(new KeyValueMapper<String, String, KeyValue<String, String>>() {
@Override
public KeyValue<String, String> apply(String key, String val) {
return new KeyValue<>(key.toLowerCase(), val.toLowerCase());
}
});
Or, just use mapValues
to work with values only:
words.mapValues(new ValueMapper<String, String>() {
@Override
public String apply(String val) {
return val.toLowerCase();
}
});
Using flatMapValues
, you can further break down a value into a collection of values.
stream.flatMap(new KeyValueMapper<String, String, Iterable<? extends KeyValue<? extends String, ? extends String>>>() {
@Override
public Iterable<? extends KeyValue<? extends String, ? extends String>> apply(String key, String val) {
String[] values = val.split(",");
return Arrays.asList(values)
.stream()
.map(value -> new KeyValue<>(key, value))
.collect(Collectors.toList());
}
})
filter
Generally, the filter
method only includes records in a KStream
that fulfill a specific criterion. It is complemented by the filterNot
method that is used to exclude records. In both cases, the filtration criterion is defined using a Predicate
object.
For example, to only process transactions of a specific credit card type:
KStream<String, String> transactions = builder.stream("user-transactions");
transactions.filter(new Predicate<String, Transaction>() {
@Override
public boolean test(String userID, Transaction tx) {
return tx.cardType().equals("VISA");
}
})
To ignore/exclude all users who haven't set their password:
KStream<String, String> users = builder.stream("users");
users.filterNot(new Predicate<String, User>() {
@Override
public boolean test(String userID, User user) {
return user.isPwdSet();
}
})
group
Grouping operations are often used to convert the contents of a KStream
to a KGroupedStream
to perform stateful computations (covered later in this guide). This can be achieved using groupByKey
or a more generic group
method.
While using groupByKey
is straightforward, note that a KeyValueMapper
can be used with groupBy
to use a different key. For example, you can use it to group user transactions based on card type:
KStream<String, User> transactions = builder.stream("transactions");
KGroupedStream<String, String> grouped = transactions.groupBy(new KeyValueMapper<String, User, String>() {
@Override
public String apply(String txID, User user) {
return user.getCardType();
}
});
to and through
The to
method is different from some of the other operations you've encountered so far. It's quite simple, yet very powerful since it lets us materialize (store) KStream
records to another topic in Kafka - just with a simple method call!
It returns a void
result instead of a KStream
(or KTable
) - such types of operations are also known as Terminal methods.
In this example, all the lowercase words are sent to a topic named lowercase-words
after being transformed using mapValues
operation (which was covered earlier).
KStream<String, String> words = builder.stream("words");
words.mapValues(new ValueMapper<String, String>() {
@Override
public String apply(String val) {
return val.toLowerCase();
}
})
.to("lowercase-words");
through
is another simple yet powerful operation that is often used to complement the to
method while building streaming pipelines. To continue the above example - Say, after storing all the lowercase words in a topic, you need to remove all the words that have a specific character (e.g. a hyphen -
) and store the final results in another Kafka topic. Instead of using to
method and creating a new KStream
from the topic lowercase-words
, it's possible to simplify the code like this:
KStream<String, String> words = builder.stream("words");
words.mapValues(new ValueMapper<String, String>() {
@Override
public String apply(String val) {
return val.toLowerCase();
}
})
.through("lowercase-words")
.filter(new Predicate<String, String>() {
@Override
public boolean test(String k, String v) {
return v.contains("-");
}
})
.to("processed-words");
print and peek
If you want to log the KStream
records (for debugging purposes), print
is a handy method (it's also a terminal operation, just like to
). It's also possible to configure the behavior of this method using a Printed
object (that print
accepts).
For example, to log the values of a KStream
to the standard out terminal:
KStream<String, String> words = builder.stream("words");
words.mapValues(new ValueMapper<String, String>() {
@Override
public String apply(String val) {
return val.toLowerCase();
}
})
.print(Printed.withLabel("demo").toSysOut());
The peek
method is similar to print
in terms of functionality, but it's not a terminal operation. Instead, it allows the caller to use a ForeachAction
to define the specific action and returns the same KStream
instance.
In this example, we simply log the key and value to standard out:
KStream<String, String> words = builder.stream("words");
words.mapValues(new ValueMapper<String, String>() {
@Override
public String apply(String val) {
return val.toLowerCase();
}
})
.peek(new ForeachAction<String, String>() {
@Override
public void apply(String k, String v) {
System.out.println("key is "+k+", value is "+v);
}
})
.to("lowercase-words");
merge
If you have two streams and need to combine them, use merge
.
KStream<String, String> fte = builder.stream("fte");
KStream<String, String> contractor = builder.stream("contractors");
fte.merge(contractor).to("all-employees");
Kafka Streams Stateful functions
This section will cover aggregation operations (aggregate
, count
and reduce
) along with an overview of Windowing in Kafka Streams. A side-effect of all these operations is "state" (hence the name Stateful operations), and it's important to understand where it's stored and how it's managed.
The state associated with these operations is stored in local "state stores" - either in-memory or on disk. The "data locality" makes the processing much more efficient. You can also configure your application such that this state store data is also sent Kafka topics. This is important for high availability and fault tolerance since the data can be restored from Kafka in case of application issues or crashes.
Let's go over some of these stateful operations.
count
KGroupedStream
supports this operation. It makes it convenient to count the number of records of a specific key by using this operation using a single method.
Continuing with the groupBy
example presented earlier. Once we group the transactions by card type, we can simply use count
to get the number of transactions for each card type.
KStream<String, User> transactions = builder.stream("transactions");
KGroupedStream<String, String> grouped = transactions.groupBy(new KeyValueMapper<String, User, String>() {
@Override
public String apply(String txID, User user) {
return user.getCardType();
}
});
KTable<String, Long> txPerCardType = grouped.count();
In order to store this state (count) locally, count accepts an instance of Materialized
, which can be used as follows:
KTable<String, Long> txPerCardType = grouped.count(Materialized.as("tx-per-card-type"));
aggregate
aggregate
comes in handy when executing calculations like moving averages over a streaming data set. This requires the state to be handled and has to take into account the current value and the computed aggregate's current value.
A good way to understand aggregate
is to actually use it to implement count
operation. When the first record is received, the Initializer
is used to initialize the state (in this example, the count is set to zero) and invoked with the first record. After that, the Aggregator
takes over - In this example, whenever a record is received, the current count is incremented by one.
KStream<String, String> stream = builder.stream("transactions");
KTable<String, Result> aggregate = stream.groupByKey()
.aggregate(new Initializer<Result>() {
@Override
public Result apply() {
return new Result("", 0);
}
}, new Aggregator<String, String, Result>() {
@Override
public Result apply(String k, String v, Result count) {
Integer currentCount = count.getCount();
return new Result(k, currentCount + 1);
}
});
reduce
reduce
operation can be used to combine streams of values and implement sum
, min
, max
etc. You can think of aggregate
operation as a generic version of reduce
.
Windowing with Kafka Streams
For example, a common requirement for website analytics is to have metrics about the number of unique page views per hour, clicks per minute, etc. Windowing
lets you confine the stream processing operations to execute within a time range.
Supported time windows include: sliding, tumbling, hopping, and session-based time windows.
For counting unique page views per hour, you can use a tumbling time window of 60 minutes. Thus, page views for a product from 1 PM to 2 PM will be aggregated and a fresh time block will start after that. Here is an example of how you might achieve this:
KStream<Product, Long> views = builder.stream("product-views");
views.groupByKey()
.windowedBy(SessionWindows.with(Duration.ofMinutes(60)))
.toStream()
.to("views-per-hour");
Conclusion
This guide provided an introduction to Kafka Streams and the type of APIs. This was followed by coverage of commonly used Stateless and Stateful operations, along with examples. You can refer to the Kafka Streams Javadocs and Kafka documentation for further reading.