Kafka Connect from the ground up, part II
In this article I describe how to do what I think is the simplest possible thing with Kafka Connect (KC), under Strimzi on OpenShift. The steps I describe should work on other Kubernetes platforms, but OpenShift is what I've tested. This article follows directly from Part I, which was about doing the simplest possible thing on bare metal. Using Strimzi on OpenShift/Kubernetes is much more complicated, for reasons I will explain. However, if you followed part I, you will see much that is familiar. In fact, if you haven't read Part I, and you know little-to-nothing about Kafka Connect, I would recommend reading it and, ideally, following the steps yourself.
For the record, I take the 'simplest possible thing' to be reading messages from a Kafka topic, and writing them into a plain text file. While there might have been some real-world application for such a process on bare metal, there's probably no need ever to use it in a container, other than for education. However, the method of configuration I describe here is equally appropriate for more practical uses of Kafka Connect, particularly to run Debezium.
You may remember from Part 1 that Kafka Connect consists of a framework
that runs tasks, and a set of plug-ins. You can run the
framework with no plug-ins but, other than to see what happens,
there's little reason to do so. However, some bare-metal installations of
FileStream plug-in by default, so the bare framework
does have some application, albeit a trivial one. So far as I know,
the stock Kafka Connect framework on Strimzi does not include even
FileStream plug-in, so even a 'Hello, World' use of
KC will require a bit of set-up.
We will be working in a newly-created Kubernetes namespace called
kafka. I've already made a stock, default installation of Strimzi
-- three broker pods and three Zookeeper pods. I'm not going to explain
how to set this up -- it's well-documented elsewhere.
So, at the start of the exercise, we have the following setup:
$ kubectl config set-context --current --namespace=kafka $ kubectl api-resources |grep Kafka kafkaconnectors kctr kafka.strimzi.io/v1beta2 true KafkaConnector kafkaconnects kc kafka.strimzi.io/v1beta2 true KafkaConnect ... $ kubectl get pods NAME READY STATUS RESTARTS AGE my-cluster-entity-operator-b94b44c7-cqqlb 3/3 Running 0 105m my-cluster-kafka-0 1/1 Running 0 106m my-cluster-kafka-1 1/1 Running 0 106m my-cluster-kafka-2 1/1 Running 0 106m my-cluster-zookeeper-0 1/1 Running 0 106m my-cluster-zookeeper-1 1/1 Running 0 106m my-cluster-zookeeper-2 1/1 Running 0 106m
In particular, the custom API resources
KafkaConnector are available -- these are what we will
use to configure Kafka Connect.
Configuring the Kafka Connect infrastructure
We will set up the infrastructure by deploying a
As a minimum we need to
FileStream plug-in within the infrastructure
for this example.
On bare metal we enabled plug-ins by locating the plug-in path in the configuration file, and dropping the necessary JAR files into the specified directory. We can't do this on Kubernetes, because the pod's filesystem is immutable. Instead, we have to instruct the Kafka Connect infrastructure to build an entirely new container image, that contains the basic infrastructure and the JAR file (or files) that comprise the plug-in.
This process is reminiscent of the 'source-to-image' process that was widely used to deploy, for example, applications based on the Wildfly application server. The generated image contains all the same stuff as the base image, and some additional bits.
Here is a suitable YAML file to set up KC with the
plug-in in place
(see comments below).
kind: KafkaConnect apiVersion: kafka.strimzi.io/v1beta2 metadata: name: my-connect-cluster namespace: kafka annotations: strimzi.io/use-connector-resources: "true" spec: version: 3.5.0 replicas: 1 bootstrapServers: 'my-cluster-kafka-bootstrap:9093' tls: trustedCertificates: - secretName: my-cluster-cluster-ca-cert certificate: ca.crt config: group.id: connect-cluster offset.storage.topic: connect-cluster-offsets config.storage.topic: connect-cluster-configs status.storage.topic: connect-cluster-status config.storage.replication.factor: -1 offset.storage.replication.factor: -1 status.storage.replication.factor: -1 build: output: image: image-registry.openshift-image-registry.svc:5000/kafka/my-connect-cluster:latest type: docker plugins: - artifacts: - type: jar url: https://repo1.maven.org/maven2/org/apache/kafka/connect-file/3.4.0/connect-file-3.4.0.jar name: connect-file
There are a few things to note about this specification. This annotation:
annotations: strimzi.io/use-connector-resources: "true"
prepares the generated KC image to respond to configuration by
custom resources, not just the REST API. We can use the
REST API on Kubernetes, just as we did on bare metal, but using
custom resources is more idiomatic. With this annotation, we will
be able to deploy resources of type
(more on this later).
Watch out for the closeness of the names of the custom resources here: a
KafkaConnectdefines the KC infrastructure, while a
KafkaConnectordefines a particular connector within a KC installation.
build: part of the configuration tells the infrastructure
to build a new image and store it in a specific repository. On OpenShift,
this can be the built-in image repository, as in my example, but other
alternatives are possible. The image will contain the required plug-in,
along with all the infrastructure components. I've specified the plug-in
JAR as a package in a public Maven repository in this case.
Finally, note that some of the names in the YAML, like
my-cluster-kafka-bootstrap match the pre-existing Kafka
installation. If you didn't use the stock installation, you'll need
to review the YAML to check that everything matches.
To deploy the infrastructure, just run
kubectl apply -f on the
YAML file. You'll see the builder pod get created, which will have a name
my-connect-cluster-connect-build-1-build. It could take
a few minutes for this pod to do its work. When it's finished, you should
$ kubectl get pods NAME READY STATUS RESTARTS AGE my-connect-cluster-connect-856f4896c5-k6c5r 1/1 Running 0 1m my-connect-cluster-connect-build-1-build 0/1 Completed 0 10m
my-connect-cluster-connect-NNNN runs the customized
KC infrastructure, containing the
Let's log in
to this pod and look at the filesystem.
$ kubectl exec my-connect-cluster-connect-856f4896c5-k6c5r -it -- /bin/sh $ ls -lR /opt/kafka/plugins/ ... /opt/kafka/plugins/connect-file/018a2cb1: total 16 -rw-r--r--. 1 root root 15749 Dec 18 14:16 018a2cb1.jar
018a2cb1.jar is derived from the original
connect-file-3.4.0.jar specified in the customer resource. If you run
on it, you'll see the classes that make up the
The command that actually runs the KC infrastructure is
If you look at the automatically-generated configuration file
/tmp/strimzi-connect.properties you'll see that it sets
/opt/kafka/plug-ins, which is
where the plugins we specified in the YAML file have ended up
(albeit with different names).
Leave a session logged into this pod -- we'll need to watch the output file of the
FileStreamplug-in, so see that the plug-in is working.
This method of specifying which plug-ins to use is complicated, when all we
want to do is run the
FileStream plug-in. However, it doesn't
get any more complicated in 'real' installations. If we were running
Debezium, for example, for change-data capture, we would need to include
database drivers as well as the Debezium plug-in, and maybe supporting
JARs as well. The method for doing this is exactly as we used in this
example, but with different JAR files.
Running the connector
So, all being well, we should have the KC infrastructure running in a single Kubernetes pod, which is hosting on an image that contains the plug-in we want to use. We now have to create a specific connector that uses this plug-in.
In Part I I showed how to use the KC REST API, to send a JSON file that
contained the connector specification. That method will work here as
well, but we don't need to: we can use
customer resources. Here is an example, that uses the
plug-in we installed earlier.
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnector metadata: name: my-file-sink labels: strimzi.io/cluster: my-connect-cluster spec: class: org.apache.kafka.connect.file.FileStreamSinkConnector tasksMax: 1 config: file: "/tmp/foo.sink.txt" topics: "foo" value.converter: "org.apache.kafka.connect.storage.StringConverter"
The resource provides the name of the connector, and the Kafka Connect
cluster it will be assigned to (
It indicates the class name of the plug-in, and some configuration
for that plug-in. In this case, the configuration says to read from the
foo, and write to the file
value.converter setting says that the messages should
be interpreted as plain text strings -- the default would be JSON files.
You could leave the default, of course, if the messages did actually contain
Install the connector by running
kubectl apply -f on the
YAML file. This won't create any new pods -- it will just add a task to the
connector pod's workload.
We can check the status of the connector like this:
$ kubectl get kafkaconnectors NAME CLUSTER CONNECTOR CLASS MAX TASKS READY my-file-sink my-connect-cluster org.apache.kafka.connect.file.FileStreamSinkConnector 1 True
All the usual ways of managing custom resources will work on KC connectors as well as
get. In practice, these Kubernetes API invocations get transformed into operations on the REST API. Probably that's a detail you'll only notice if there are errors, when the REST URL might appear in the log file of the KC pod.
Testing the connector
Log into the Kafka Connect pod. You should see that the connector's output file has been created:
$ ls /tmp foo.sink.txt ... $ tail -f /tmp/foo.sink.txt
tail -f command will display any new data that gets
written to the file.
To test the connector, we need to write messages to the topic
foo. There are many ways to do that, but the easiest
-- for testing purposes -- is just to log into one of the Kafka broker
pods, and run
$ kubectl exec my-cluster-kafka-0 -it -- /bin/sh sh-4.4$ ./bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic foo > Hello, World!
All being well, you should see the message you enter at the prompt appear in the connector's output file.
Using the REST API
The REST API that I outlined in Part I should still work, although you'll
probably need to log into a pod to invoke it. Unless you changed it at
installation time, the REST API port will be 8083, as in the bare metal
installation. So, to invoke the
connectors API and get a list
of installed connectors, we can log into the KC pod and run this:
$ curl `hostname`:8083/connectors ... ["my-file-sink"]
You probably won't be able to use
localhosthere -- in recent Strimzi versions the API port does not bind to the loopback address.
While the REST API works, you should bear in mind that this API is also
used by the KC infrastructure for Strimzi integration. It's probably best
not to mix REST-based administration with the use of
Running Kafka Connect in a container environment is conceptually the same as running it on bare metal. We still run the KC framework with particular plug-ins enabled; we can still use the REST API to configure it.
The additional complexity of using Strimzi stems from the 'cattle, not pets' philosophy of Kubernetes. We can't just install plug-ins by copying JAR files into the container, because the container filesystem is immutable. On the positive side, the complexity does not get any worse -- on the conceptual level -- when we use KC for real applications (like Debezium). On a practical level, however, we still have to make the necessary JARs available in repositories where the KC build framework can find them. It can, frankly, all be a bit tedious.