Getting started with Kafka Streams, part 2
This article follows directly from Part 1, and uses the same set-up and basic code. It covers the rudiments of stateful operations, time-windowed operations, and aggregation. That might sound like an awful lot of material but, in fact, all three topics can be demonstrated in a simple application. If you haven’t read the first article, however, I would recommend doing so – I’m not sure this one will make any sense, otherwise.
All the examples in Part 1 were stateless; that is, they worked on one record at a time, and nothing depended on anything that happened in the past. Aggregation and counting are, however, inherently stateful – they depend critically on the history of the messages that have been processed up to a particular point.
In this example, I will implement a simple count of records with the same key. This could be genuinely useful. Suppose, for example, that you have a transactional web-based system for trading certain items. Each time a trade happens, the system writes a record to Kafka whose key is the ID of the user who made it. Or perhaps (or, in addition) the system writes a record indicating the commodity that was traded, and how much.
We can split the message stream from the relevant topics by key, then use a counter to build a table of totals for each key. Then we can, perhaps, feed back the totals into another Kafka topic, or write it to a database table.
Note
In this article, I will treat counting as a special kind of aggregation. In both cases, we build a two-dimensional structure from a one-dimensional message flow. The difference is that, with aggregation there will usually be some further processing; with a counting procedure, the counts are generally the final result. Both procedures, counting and aggregation, use essentially the same structure in Kafka Streams.
Aggregation is generally by key in Kafka Streams. That is, we assume that the message key means something. It might be a user ID, or a product ID, or something else, but it has to be something. If the messages have no key, then usually the first step in aggregation will be to derive one.
KStreams and KTables
To understand counting and aggregation, it’s crucial to be clear on
the distinction between a KStream
and a
KTable
. Frankly, though, I can do little better to
illustrate this than to recommend the graphical demonstration of the
duality between tables and streams in the Confluent documentation.
In brief, A KTable
is a two-dimensional, key-value
structure, which is generated by treating individual messages as entries
in a change log. That is, each row in the table has a key that matches
the message key, and the value is the latest value received from the
stream. If you’ve used Debezium to capture database changes into Kafka,
you’ll already be familiar with the idea that a stream of messages can
capture the state of a database table as it changes over time. It’s less
obvious, however, that the reverse is also true – if we have a log of
changes to a table, we can always ‘replay’ the changes to reconstruct
the original table.
So Kafka Streams assumes a kind of equivalence between message streams and tables – it’s always possible to construct one from the other. However, some Streams APIs are best expressed in terms of tables, rather than streams.
Note
The equivalence betweenKTable
andKStream
is not sufficiently close that they can be derived from a common base class. Unfortunately. These classes have some methods in common, but not all.
The sample application
I’ll describe how to code and test the application first, and then explain how it works.
Go back to the original Pipe.java
application, created
from the kafka-streams-quickstart
Maven archetype. Replace
the definition of the topology with the following:
builder.stream("streams-plaintext-input")
.groupByKey().count().toStream()
.peek ((k,v)->System.out.println (k + ": " + v));
To test the application we will need to send messages to the
streams-plaintext-input
Kafka topic. There is no output
topic – in this case output is only to the console.
I’m assuming that you still have Kafka set up, and the input topic exists. So run the application using Maven, as before:
mvn compile exec:java -Dexec.mainClass=me.kevinboone.apacheintegration.kstreams_test.Pipe
Of course, use your own package name if you didn’t copy the steps in Part 1 exactly, and chose your own naming.
To exercise the application, you’ll need to send messages with
specific keys. It’s a little fiddly to do this with
kafka-console-producer.sh
, but possible. You need to
specify a key-value separator, like this (I have chosen the colon, but
any character will be fine):
./bin/kafka-console-producer.sh --bootstrap-server localhost:9092 \
--topic streams-plaintext-input --property parse.key=true \
--property key.separator=:
Now enter some key-value pairs at the prompt, for example:
dog:asdasd
cat:xxxx
dog:yyy
budgie:asdasdasd
The actual message body is irrelevant in this example, because the application does not process or even display the bodies – it just counts messages with specific keys.
As the Streams application runs it will eventually produce output of this form:
dog:2
cat:1
budgie:1
That is, there have (so far) been two messages with key
dog
, and one each with cat
and
budgie
. If you send more messages, you’ll see updates with
the new totals.
You might notice that updates are not immediate. It might take up to
30 seconds. This is the time interval at which Kafka Streams commits
open streams. The other time at which you might see some output is when
the aggregation cache is full; but, since the default size is 10Mb,
that’s unlikely to happen here. Both the commit interval and the cache
size can be changed when the KafkaStreams
application
instance is created.
Analysing the application
The sample application is mostly trivial, but there’s a lot going on. Let’s look at it step by step.
The method call
builder.stream("streams-plaintext-input")
creates an instance of KStream
that models the raw
message stream from the Kafka topic. Calling groupByKey()
on this object creates an instance of KGroupedStream
. This
models a message stream that can be differentiated by its keys.
There are essentially three things that can be done with a
KGroupedStream
:
- Create a counter on it, or
- create an aggregator on it, or
- create a new grouped stream with specific time-windowing properties.
In the simple example, I just call count()
to create a
new counter. This method returns a KTable
. This is a
two-dimensional representation of the data stream, where the first
column is the message key, and the second (in this case) is the
count.
For the purposes of this exercise, it would be nice if there were a
method KTable.peek()
that extracts the table’s contents, as
KStream.peek()
does for streams. So far as I know, however,
there is no such method. What we can do, however, is convert the
KTable
back to a KStream
, and call
peek()
on that.
Looking at the operation of the application in more detail
If the Kafka Streams application is running, try stopping it and
starting it again. Then send some more messages to
streams-plaintext-input
, with the same keys as before. You
should notice that the totals include values from the previous run. By
default, the aggregation is not merely stateful, it is persistent as
well.
It’s reasonable to wonder where this persistent state is stored. By default it is stored on disk in a RocksDB database. The location of the store is, by default,
/tmp/kafka-streams/streams_pipe/0_0
streams_pipe
, you may recall, is the application name,
set in the APPLICATION_ID_CONFIG
configuration property.
The numbers in the directory name reflect the fact that the topology
might include multiple persistent elements, and they need their own data
stores.
Storing data this way is adequate when there is only one instance of the client. But in a clustered set-up, where the application may run on multiple hosts, what happens if one of the application instances crashes, or is shut down, in the middle of an aggregation? The local file store is not available to other instances.
The solution is to back up the aggregation store on the Kafka broker itself. Look at the topics on the broker, after running the application:
$ ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
streams-pipe-KSTREAM-AGGREGATE-STATE-STORE-0000000001-changelog
streams-pipe-output
streams-pipe-output-long
streams-plaintext-input
The aggregation store is kept in a Kafka topic in the form of a change log. If necessary, the local aggregation stored can be reconstructed from this change log.
It is this Kafka-based back-up of the replication store that makes it
safe to keep the local store in memory, rather than a RocksDB database.
The WordCount
example application shows a way to do
that.
Despite the persistence, you might find that if you leave the
application running long enough, the aggregation totals get reset.
That’s because the aggregation process has a time window of one hour by
default. In practice, applications will usually seek to total over a
specific time window. The Streams API provides classes like
TimeWindowedKStream
to do this.
A technical digression
There’s one more thing to demonstrate in this article, and it’s a rather technical one, but important once the application gets beyond the “Hello, World” stage.
In the application we used toStream()
to convert the
table of counts back to a stream, and then peek()
to print
it. It should be possible to send the converted stream to another Kafka
topic and, indeed, it is – but the results might be surprising. Try
adding a to()
call to the application:
builder.stream("streams-plaintext-input")
.groupByKey().count().toStream()
.peek ((k,v)->System.out.println (k + ": " + v))
.to ("streams-pipe-output-counts");
Look at the output using kafa-streams-consumer.sh
.
You’ll need to enable the printing of keys to see anything:
./bin/kafka-console-consumer.sh --property print.key=true \
--bootstrap-server localhost:9092 --topic streams-pipe-output-counts
If you send some more data to streams-plaintext-input
,
you should see some output from the consumer. But you’ll probably find
that the totals are missing. Why?
I’ve said that the count()
method returns a
KTable
, but KTable
is a template class, as
KStream
is: both are templatized by key and value. That is,
a KTable
isn’t just a table of objects; it’s a table of
specific key and value classes. In most of the previous examples I’ve
fudged over this fact, and often it’s not important. In this case,
however, it is.
So the count()
method doesn’t really return a
KTable
– it returns (in this case) a
KTable<?, Long>
. The type of the key is, in this
case, unspecified (because the code doesn’t say what it should be). At
runtime the key class will be String
because that’s the way
the serialization is set up. But the ‘value’ column of the count table
is parameterized as a Long
, whatever the type of the stream
it was derived from.
Then, when we call toStream()
on the output of
count()
what we actually get is an instance of
KStream<?, Long>
.
This makes perfect sense – a count can only be a number. But when we
write to the output stream, we’re writing messages whose payload is a
Long
, and kafka-console-consumer.sh
assumes by
default that the values are strings.
So to see the counts, we need to run the consumer like this:
./bin/kafka-console-consumer.sh --property print.key=true \
--bootstrap-server localhost:9092 --topic streams-pipe-output-counts \
--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
If you want to store the counts in the Kafka topic as strings, you’ll need to make a conversion in code.
Closing remarks
In this article I demonstrated the rudiments of stateful operations (counts and aggregations) in Kafka Streams. There is, of course, a lot more to this topic. In particular, the API provides powerful control of time windowing, and a configurable notion of what a ‘timestamp’ amounts to in the application. The difference between, for example, the time at which an event was detected, and the time it was ingested into Kafka, might only amount to microseconds; but with the throughputs that Kafka is designed for, that difference might be important.