Getting started with Kafka Streams, part 2

This article follows directly from Part 1, and uses the same set-up and basic code. It covers the rudiments of stateful operations, time-windowed operations, and aggregation. That might sound like an awful lot of material but, in fact, all three topics can be demonstrated in a simple application. If you haven’t read the first article, however, I would recommend doing so – I’m not sure this one will make any sense, otherwise.

All the examples in Part 1 were stateless; that is, they worked on one record at a time, and nothing depended on anything that happened in the past. Aggregation and counting are, however, inherently stateful – they depend critically on the history of the messages that have been processed up to a particular point.

In this example, I will implement a simple count of records with the same key. This could be genuinely useful. Suppose, for example, that you have a transactional web-based system for trading certain items. Each time a trade happens, the system writes a record to Kafka whose key is the ID of the user who made it. Or perhaps (or, in addition) the system writes a record indicating the commodity that was traded, and how much.

We can split the message stream from the relevant topics by key, then use a counter to build a table of totals for each key. Then we can, perhaps, feed back the totals into another Kafka topic, or write it to a database table.

Note
In this article, I will treat counting as a special kind of aggregation. In both cases, we build a two-dimensional structure from a one-dimensional message flow. The difference is that, with aggregation there will usually be some further processing; with a counting procedure, the counts are generally the final result. Both procedures, counting and aggregation, use essentially the same structure in Kafka Streams.

Aggregation is generally by key in Kafka Streams. That is, we assume that the message key means something. It might be a user ID, or a product ID, or something else, but it has to be something. If the messages have no key, then usually the first step in aggregation will be to derive one.

KStreams and KTables

To understand counting and aggregation, it’s crucial to be clear on the distinction between a KStream and a KTable. Frankly, though, I can do little better to illustrate this than to recommend the graphical demonstration of the duality between tables and streams in the Confluent documentation.

In brief, A KTable is a two-dimensional, key-value structure, which is generated by treating individual messages as entries in a change log. That is, each row in the table has a key that matches the message key, and the value is the latest value received from the stream. If you’ve used Debezium to capture database changes into Kafka, you’ll already be familiar with the idea that a stream of messages can capture the state of a database table as it changes over time. It’s less obvious, however, that the reverse is also true – if we have a log of changes to a table, we can always ‘replay’ the changes to reconstruct the original table.

So Kafka Streams assumes a kind of equivalence between message streams and tables – it’s always possible to construct one from the other. However, some Streams APIs are best expressed in terms of tables, rather than streams.

Note
The equivalence between KTable and KStream is not sufficiently close that they can be derived from a common base class. Unfortunately. These classes have some methods in common, but not all.

The sample application

I’ll describe how to code and test the application first, and then explain how it works.

Go back to the original Pipe.java application, created from the kafka-streams-quickstart Maven archetype. Replace the definition of the topology with the following:

  builder.stream("streams-plaintext-input")
        .groupByKey().count().toStream()
        .peek ((k,v)->System.out.println (k + ": " + v));

To test the application we will need to send messages to the streams-plaintext-input Kafka topic. There is no output topic – in this case output is only to the console.

I’m assuming that you still have Kafka set up, and the input topic exists. So run the application using Maven, as before:

 mvn compile exec:java -Dexec.mainClass=me.kevinboone.apacheintegration.kstreams_test.Pipe

Of course, use your own package name if you didn’t copy the steps in Part 1 exactly, and chose your own naming.

To exercise the application, you’ll need to send messages with specific keys. It’s a little fiddly to do this with kafka-console-producer.sh, but possible. You need to specify a key-value separator, like this (I have chosen the colon, but any character will be fine):

./bin/kafka-console-producer.sh --bootstrap-server localhost:9092 \
   --topic streams-plaintext-input --property parse.key=true \
   --property key.separator=:

Now enter some key-value pairs at the prompt, for example:

dog:asdasd
cat:xxxx
dog:yyy
budgie:asdasdasd

The actual message body is irrelevant in this example, because the application does not process or even display the bodies – it just counts messages with specific keys.

As the Streams application runs it will eventually produce output of this form:

dog:2
cat:1
budgie:1

That is, there have (so far) been two messages with key dog, and one each with cat and budgie. If you send more messages, you’ll see updates with the new totals.

You might notice that updates are not immediate. It might take up to 30 seconds. This is the time interval at which Kafka Streams commits open streams. The other time at which you might see some output is when the aggregation cache is full; but, since the default size is 10Mb, that’s unlikely to happen here. Both the commit interval and the cache size can be changed when the KafkaStreams application instance is created.

Analysing the application

The sample application is mostly trivial, but there’s a lot going on. Let’s look at it step by step.

The method call

builder.stream("streams-plaintext-input")

creates an instance of KStream that models the raw message stream from the Kafka topic. Calling groupByKey() on this object creates an instance of KGroupedStream. This models a message stream that can be differentiated by its keys.

There are essentially three things that can be done with a KGroupedStream:

In the simple example, I just call count() to create a new counter. This method returns a KTable. This is a two-dimensional representation of the data stream, where the first column is the message key, and the second (in this case) is the count.

For the purposes of this exercise, it would be nice if there were a method KTable.peek() that extracts the table’s contents, as KStream.peek() does for streams. So far as I know, however, there is no such method. What we can do, however, is convert the KTable back to a KStream, and call peek() on that.

Looking at the operation of the application in more detail

If the Kafka Streams application is running, try stopping it and starting it again. Then send some more messages to streams-plaintext-input, with the same keys as before. You should notice that the totals include values from the previous run. By default, the aggregation is not merely stateful, it is persistent as well.

It’s reasonable to wonder where this persistent state is stored. By default it is stored on disk in a RocksDB database. The location of the store is, by default,

/tmp/kafka-streams/streams_pipe/0_0

streams_pipe, you may recall, is the application name, set in the APPLICATION_ID_CONFIG configuration property. The numbers in the directory name reflect the fact that the topology might include multiple persistent elements, and they need their own data stores.

Storing data this way is adequate when there is only one instance of the client. But in a clustered set-up, where the application may run on multiple hosts, what happens if one of the application instances crashes, or is shut down, in the middle of an aggregation? The local file store is not available to other instances.

The solution is to back up the aggregation store on the Kafka broker itself. Look at the topics on the broker, after running the application:

$ ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --list

streams-pipe-KSTREAM-AGGREGATE-STATE-STORE-0000000001-changelog
streams-pipe-output
streams-pipe-output-long
streams-plaintext-input

The aggregation store is kept in a Kafka topic in the form of a change log. If necessary, the local aggregation stored can be reconstructed from this change log.

It is this Kafka-based back-up of the replication store that makes it safe to keep the local store in memory, rather than a RocksDB database. The WordCount example application shows a way to do that.

Despite the persistence, you might find that if you leave the application running long enough, the aggregation totals get reset. That’s because the aggregation process has a time window of one hour by default. In practice, applications will usually seek to total over a specific time window. The Streams API provides classes like TimeWindowedKStream to do this.

A technical digression

There’s one more thing to demonstrate in this article, and it’s a rather technical one, but important once the application gets beyond the “Hello, World” stage.

In the application we used toStream() to convert the table of counts back to a stream, and then peek() to print it. It should be possible to send the converted stream to another Kafka topic and, indeed, it is – but the results might be surprising. Try adding a to() call to the application:

  builder.stream("streams-plaintext-input")
        .groupByKey().count().toStream()
        .peek ((k,v)->System.out.println (k + ": " + v))
        .to ("streams-pipe-output-counts");

Look at the output using kafa-streams-consumer.sh. You’ll need to enable the printing of keys to see anything:

./bin/kafka-console-consumer.sh --property print.key=true \
     --bootstrap-server localhost:9092 --topic streams-pipe-output-counts 

If you send some more data to streams-plaintext-input, you should see some output from the consumer. But you’ll probably find that the totals are missing. Why?

I’ve said that the count() method returns a KTable, but KTable is a template class, as KStream is: both are templatized by key and value. That is, a KTable isn’t just a table of objects; it’s a table of specific key and value classes. In most of the previous examples I’ve fudged over this fact, and often it’s not important. In this case, however, it is.

So the count() method doesn’t really return a KTable – it returns (in this case) a KTable<?, Long>. The type of the key is, in this case, unspecified (because the code doesn’t say what it should be). At runtime the key class will be String because that’s the way the serialization is set up. But the ‘value’ column of the count table is parameterized as a Long, whatever the type of the stream it was derived from.

Then, when we call toStream() on the output of count() what we actually get is an instance of KStream<?, Long>.

This makes perfect sense – a count can only be a number. But when we write to the output stream, we’re writing messages whose payload is a Long, and kafka-console-consumer.sh assumes by default that the values are strings.

So to see the counts, we need to run the consumer like this:

./bin/kafka-console-consumer.sh --property print.key=true \
 --bootstrap-server localhost:9092 --topic streams-pipe-output-counts \
 --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer 

If you want to store the counts in the Kafka topic as strings, you’ll need to make a conversion in code.

Closing remarks

In this article I demonstrated the rudiments of stateful operations (counts and aggregations) in Kafka Streams. There is, of course, a lot more to this topic. In particular, the API provides powerful control of time windowing, and a configurable notion of what a ‘timestamp’ amounts to in the application. The difference between, for example, the time at which an event was detected, and the time it was ingested into Kafka, might only amount to microseconds; but with the throughputs that Kafka is designed for, that difference might be important.