Kafka Connect from the ground up

Kafka logo Apache Kafka is an increasingly popular distributed message bus, that emphasises robustness and high throughput. Kafka Connect is a framework and infrastructure for exchanging data between Kafka and various other systems, without coding an application. This article describes, step-by-step, using only a text editor and command-line tools, how to configure and run a simple Kafka Connect connector, that sends data from a Kafka topic to a simple text file. To do this, we will use a sample connector called FileStreamSink, which is (or was) part of the standard Kafka distribution.

Note:
The FileStream connectors are precarious, because they don't work well in a clustered environment. Although they are very handy for testing, there have been rumblings about removing them, because they probably aren't safe to use in a production set-up. If this happens, I hope that it will be made obvious how to obtain and install the JARs, when they are no longer part of the stock Kafka distribution.

About Kafka Connect

Kafka Connect is a process, or set of processes, that run alongside Kafka. The Kafka Connect infrastructure can be clustered, and it's common to run one Connect worker process on each node that runs a Kafka broker. Each worker will host one or more tasks, each assigned to a specific connector. In this simple example, however, I'll deploy Kafka and Kafka connect on a single node. Kafka Connect will thus be running in what is known as 'standalone' mode, with one worker and -- in this case -- a single task.

The Kafka Connect infrastructure can manage many connectors, also known as 'plug-ins'. We typically obtain the connectors as a Java JAR file, and install it in a specific directory that the Connect worker process searches at start up. Source connectors, provide data to Kafka, while sink connectors accept data from Kafka and send it somewhere else.

Running a simple connector has two parts. First, we run the Kafka Connect infrastructure itself. Then we will configure and instantiate a particular connector as a task within the infrastructure.

Note:
Strimzi, the implementation of Kafka on Kubernetes, includes an operator that takes care of configuring Kafka Connect in a cluster. Administering such an installation is likely to be completely different from the method I describe here, although the internal operation of Connect is similar.

1. Install a single-node Kafka

If you've installed Kafka already, you can skip this step, of course. However, I'm assuming a single-node installation; additional steps are needed to run Kafka Connect in a clustered mode, which I won't be describing here.

Note:
In this article I assume you're running Kafka on Linux; the exact flavour of Linux is not important.
Note:
I tested these instructions on Kafka 3.5, available from https://kafka.apache.org/downloads, and also on the Red Hat version, AMQ Streams For RHEL, version 2.5

Unpack the Kafka distribution into a convenient directory, e.g., /opt/kafka. The following assumes you're using that directory. All the commands shown below assume that the Kafka installation directory is the working directory.

In this example, I am deliberately using a completely stock configuration for running Kafka. It should not be necessary to make any configuration changes except, perhaps, one: in some installations, you might need to add the Kafka libs/ directory, which contains the connector JARs, to the Kafka Connect plug-in directory list. For standalone operation, this configuration is in config/connect-standalone.properties. Add/edit the line:

plugin.path=/opt/kafka/libs

If you get error messages about missing classes when you come to instantiate the connector, it is probably this configuration that needs attention.

2. Start Kafka

Note:
By the time you read this, Kafka will probably be able to operate without Apache Zookeeper as a separate configuration store. At the time of writing, however, it's still usual to run Zookeeper.

Start zookeeper

$ ./bin/zookeeper-server-start.sh config/zookeeper.properties

In a separate terminal session, start the Kafka broker:

$ ./bin/kafka-server-start.sh config/server.properties

3. Start Kafka Connect in standalone mode

$ ./bin/connect-standalone.sh ./config/connect-standalone.properties 

Even on a single node, it will take a little time for Connect to start, because it has to search for and inspect the available plug-ins. connect-standalone.properties provides overall configuration for the Connect infrastructure, and defaults for individual plug-ins that do not override them.

4. Check that Kafka Connect is responding on its REST interface

The REST interface is, by default, on port 8083, and is not encrypted or authenticated. For example, to query for installed connectors we can use curl (or similar) like this:

$ curl localhost:8083/connectors
[]

This query should return an empty list ('[]') because, so far, we have installed no connectors.

5. Create a JSON file to specify the connector

Create a JSON file compatible with the REST API, for instantiating the FileStreamSink connector. Call the file, for example, my-connector.json.

{
    "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
    "tasks.max": "1",
    "topics": "foo",
    "file": "/tmp/foo.sink.txt"
    "errors.tolerance": "all",
    "value.converter": "org.apache.kafka.connect.storage.StringConverter"
}

Note that this configuration reads from topic foo and writes to file /tmp/foo.sink.txt. The attribute errors.tolerance indicates that the connector should simply skip messages that it cannot process. value.converter specifies a class to read the message data from the topic into an internal representation. The default is to use the Kafka Connect infrastructure's global default, which is set in the configuration file. In a stock installation, this default is to treat the incoming data as JSON objects. It's simpler, for testing purposes, just to work with raw text strings.

Use curl or similar to make a PUT request on the REST API, like this:

$ curl -X PUT --data-binary @my-connector.json -H "Content-Type: application/json"  localhost:8083/connectors/kevin-file-sink/config
Note:
The Content-Type header is not optional here. The API won't work without it.

The name kevin-file-sink in the URI is arbitrary. We can use this name to administer and monitory the connector, in later invocations of the REST API. It's also used in logs and diagnostics.

6. Check that the connector has been registered

List the connectors, again using the REST API:

$ curl localhost:8083/connectors
["kevin-fle-sink"]

Check that the connector is running:

$ curl localhost:8083/connectors/kevin-file-sink/status
{"name":"kevin-file-sink","connector":{"state":"RUNNING",...}}
Note:
As configured here, this installation using the REST API is not persistent; it will need to be repeated after restarting the Kafka Connect infrastructure.

The status API may, if we are lucky, report error messages related to the connector, in case of problems. In practice, though, we will have to look in Connect's main log file as well.

7. Test the connector

In a different terminal session, watch the contents of the connector's output file:

$ tail -f /tmp/foo.sink.txt

This file should exist, as soon as the connector is running.

Now send some data to the Kafka topic using, for example, kafka-console-producer.sh. Remember that the connector is looking at the topic foo.

$ ./bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic foo

Enter some text at the prompt; each line will generate a new Kafka message. You should see these messages getting appended to the output text file.

Installing the connector at start-up time

An alternative way to install the connector, which may be more appropriate in standalone mode, is to pass the connector specification as a properties file to connect-standalone.sh. For the file sink example, the properties file would look like this:

name=kevin-file-sink
connector.class=FileStreamSink
tasks.max=1
file=/tmp/foo.sink.txt
topics=foo
errors.tolerance=all
value.converter=org.apache.kafka.connect.storage.StringConverter

These are (apart from the name) the same properties we used earlier in the JSON file, but in Java properties format.

Run Kafka Connect like this, specifying the properties file:

$ ./bin/connect-standalone.sh ./config/connect-standalone.properties  my-connector.properties 
Note:
For this stock connector, we can use FileStreamSink as the name, rather than the full Java class name.

When we launch Kafka Connect like this, the result is the same as starting the Connect infrastructure, and then using the REST API to configure the connector. That is, if the connector definition is not persistent for the REST API, it won't be persistent when using a properties file, either.

Closing remarks

In this article I've explained how to install and run a simple Kafka Connect connector, starting with the installation of Kafka itself, and continuing up to testing that the connector is basically functional.

There is, of course, a huge amount more to Kafka Connect than this. In a production set-up, we would need to think about clustering and the distribution of connector work between nodes. This article hasn't said much about error detection and correction, and I've not even touched on how offsets are handled. Offsets matter, because we don't want the connector to reprocess every message each time it starts. If time allows, I might cover these topics in later articles.

Right now, in Part II I'll explain how to do the same things as this article demonstrated, but within an installation of Strimzi on OpenShift.