Kafka Connect from the ground up
Apache Kafka is an increasingly popular distributed message bus, that
emphasises robustness and high throughput. Kafka Connect is a framework
and infrastructure for exchanging data between Kafka and various other
systems, without coding an application. This article describes, step-by-step,
using
only a text editor and command-line tools, how to configure and
run a simple Kafka Connect connector, that sends data from
a Kafka topic to a simple text file. To do this, we will use a sample
connector called FileStreamSink
, which is (or was)
part of the standard Kafka distribution.
Note:
TheFileStream
connectors are precarious, because they don't work well in a clustered environment. Although they are very handy for testing, there have been rumblings about removing them, because they probably aren't safe to use in a production set-up. If this happens, I hope that it will be made obvious how to obtain and install the JARs, when they are no longer part of the stock Kafka distribution.
About Kafka Connect
Kafka Connect is a process, or set of processes, that run alongside Kafka. The Kafka Connect infrastructure can be clustered, and it's common to run one Connect worker process on each node that runs a Kafka broker. Each worker will host one or more tasks, each assigned to a specific connector. In this simple example, however, I'll deploy Kafka and Kafka connect on a single node. Kafka Connect will thus be running in what is known as 'standalone' mode, with one worker and -- in this case -- a single task.
The Kafka Connect infrastructure can manage many connectors, also known as 'plug-ins'. We typically obtain the connectors as a Java JAR file, and install it in a specific directory that the Connect worker process searches at start up. Source connectors, provide data to Kafka, while sink connectors accept data from Kafka and send it somewhere else.
Running a simple connector has two parts. First, we run the Kafka Connect infrastructure itself. Then we will configure and instantiate a particular connector as a task within the infrastructure.
Note:
Strimzi, the implementation of Kafka on Kubernetes, includes an operator that takes care of configuring Kafka Connect in a cluster. Administering such an installation is likely to be completely different from the method I describe here, although the internal operation of Connect is similar.
1. Install a single-node Kafka
If you've installed Kafka already, you can skip this step, of course. However, I'm assuming a single-node installation; additional steps are needed to run Kafka Connect in a clustered mode, which I won't be describing here.
Note:
In this article I assume you're running Kafka on Linux; the exact flavour of Linux is not important.
Note:
I tested these instructions on Kafka 3.5, available fromhttps://kafka.apache.org/downloads
, and also on the Red Hat version, AMQ Streams For RHEL, version 2.5
Unpack the Kafka distribution into a convenient directory, e.g., /opt/kafka
.
The following assumes you're using that directory. All the commands shown
below assume that the Kafka installation directory is the working
directory.
In this example, I am deliberately using a completely stock configuration
for running Kafka. It should not be necessary to make any configuration
changes except, perhaps, one:
in some installations, you might need to add the Kafka libs/
directory,
which contains the connector JARs, to the
Kafka Connect plug-in directory list. For standalone operation,
this configuration is in config/connect-standalone.properties
.
Add/edit the line:
plugin.path=/opt/kafka/libs
If you get error messages about missing classes when you come to instantiate the connector, it is probably this configuration that needs attention.
2. Start Kafka
Note:
By the time you read this, Kafka will probably be able to operate without Apache Zookeeper as a separate configuration store. At the time of writing, however, it's still usual to run Zookeeper.
Start zookeeper
$ ./bin/zookeeper-server-start.sh config/zookeeper.properties
In a separate terminal session, start the Kafka broker:
$ ./bin/kafka-server-start.sh config/server.properties
3. Start Kafka Connect in standalone mode
$ ./bin/connect-standalone.sh ./config/connect-standalone.properties
Even on a single node, it will take a little time for Connect to start,
because it has to search for and inspect the available plug-ins.
connect-standalone.properties
provides overall configuration
for the Connect infrastructure, and defaults for individual plug-ins
that do not override them.
4. Check that Kafka Connect is responding on its REST interface
The REST interface is, by default, on port 8083, and is not encrypted
or authenticated.
For example, to query for installed connectors we can use curl
(or similar) like this:
$ curl localhost:8083/connectors []
This query should return an empty list ('[]') because, so far, we have installed no connectors.
5. Create a JSON file to specify the connector
Create a JSON file compatible with the REST API, for instantiating the
FileStreamSink
connector. Call the file,
for example, my-connector.json
.
{ "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector", "tasks.max": "1", "topics": "foo", "file": "/tmp/foo.sink.txt" "errors.tolerance": "all", "value.converter": "org.apache.kafka.connect.storage.StringConverter" }
Note that this configuration reads from topic foo
and
writes to file /tmp/foo.sink.txt
.
The attribute
errors.tolerance
indicates that the connector should
simply skip messages that it cannot process. value.converter
specifies a class to read the message data from the topic into an internal
representation. The default is to use the Kafka Connect infrastructure's
global default, which is set in the configuration file. In a stock installation,
this default is to treat the incoming data as JSON objects. It's simpler,
for testing purposes, just to work with raw text strings.
Use curl or similar to make a PUT request on the REST API, like this:
$ curl -X PUT --data-binary @my-connector.json -H "Content-Type: application/json" localhost:8083/connectors/kevin-file-sink/config
Note:
TheContent-Type
header is not optional here. The API won't work without it.
The name kevin-file-sink
in the URI is arbitrary. We can use
this name to administer and monitory the connector, in later invocations
of the REST API. It's also used in logs and diagnostics.
6. Check that the connector has been registered
List the connectors, again using the REST API:
$ curl localhost:8083/connectors ["kevin-fle-sink"]
Check that the connector is running:
$ curl localhost:8083/connectors/kevin-file-sink/status {"name":"kevin-file-sink","connector":{"state":"RUNNING",...}}
Note:
As configured here, this installation using the REST API is not persistent; it will need to be repeated after restarting the Kafka Connect infrastructure.
The status
API may, if we are lucky, report error messages
related to the connector, in case of problems. In practice, though,
we will have to look in Connect's main log file as well.
7. Test the connector
In a different terminal session, watch the contents of the connector's output file:
$ tail -f /tmp/foo.sink.txt
This file should exist, as soon as the connector is running.
Now send some data to the Kafka topic using, for example,
kafka-console-producer.sh
.
Remember that the connector is looking at the topic foo
.
$ ./bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic foo
Enter some text at the prompt; each line will generate a new Kafka message. You should see these messages getting appended to the output text file.
Installing the connector at start-up time
An alternative way to install the connector, which may be more
appropriate in standalone mode, is to pass the connector
specification as a properties file to connect-standalone.sh
.
For the file sink example, the properties file would look like this:
name=kevin-file-sink connector.class=FileStreamSink tasks.max=1 file=/tmp/foo.sink.txt topics=foo errors.tolerance=all value.converter=org.apache.kafka.connect.storage.StringConverter
These are (apart from the name
) the same properties we
used earlier in the JSON file, but in Java properties format.
Run Kafka Connect like this, specifying the properties file:
$ ./bin/connect-standalone.sh ./config/connect-standalone.properties my-connector.properties
Note:
For this stock connector, we can useFileStreamSink
as the name, rather than the full Java class name.
When we launch Kafka Connect like this, the result is the same as starting the Connect infrastructure, and then using the REST API to configure the connector. That is, if the connector definition is not persistent for the REST API, it won't be persistent when using a properties file, either.
Closing remarks
In this article I've explained how to install and run a simple Kafka Connect connector, starting with the installation of Kafka itself, and continuing up to testing that the connector is basically functional.
There is, of course, a huge amount more to Kafka Connect than this. In a production set-up, we would need to think about clustering and the distribution of connector work between nodes. This article hasn't said much about error detection and correction, and I've not even touched on how offsets are handled. Offsets matter, because we don't want the connector to reprocess every message each time it starts. If time allows, I might cover these topics in later articles.
Right now, in Part II I'll explain how to do the same things as this article demonstrated, but within an installation of Strimzi on OpenShift.