Kafka Connect from the ground up, part II

Kafka logo In this article I describe how to do what I think is the simplest possible thing with Kafka Connect (KC), under Strimzi on OpenShift. The steps I describe should work on other Kubernetes platforms, but OpenShift is what I've tested. This article follows directly from Part I, which was about doing the simplest possible thing on bare metal. Using Strimzi on OpenShift/Kubernetes is much more complicated, for reasons I will explain. However, if you followed part I, you will see much that is familiar. In fact, if you haven't read Part I, and you know little-to-nothing about Kafka Connect, I would recommend reading it and, ideally, following the steps yourself.

For the record, I take the 'simplest possible thing' to be reading messages from a Kafka topic, and writing them into a plain text file. While there might have been some real-world application for such a process on bare metal, there's probably no need ever to use it in a container, other than for education. However, the method of configuration I describe here is equally appropriate for more practical uses of Kafka Connect, particularly to run Debezium.

You may remember from Part 1 that Kafka Connect consists of a framework that runs tasks, and a set of plug-ins. You can run the framework with no plug-ins but, other than to see what happens, there's little reason to do so. However, some bare-metal installations of Kafka enable the FileStream plug-in by default, so the bare framework does have some application, albeit a trivial one. So far as I know, the stock Kafka Connect framework on Strimzi does not include even the FileStream plug-in, so even a 'Hello, World' use of KC will require a bit of set-up.

Prerequisites

We will be working in a newly-created Kubernetes namespace called kafka. I've already made a stock, default installation of Strimzi -- three broker pods and three Zookeeper pods. I'm not going to explain how to set this up -- it's well-documented elsewhere.

So, at the start of the exercise, we have the following setup:

$ kubectl config set-context --current --namespace=kafka

$ kubectl api-resources |grep Kafka
kafkaconnectors                       kctr                kafka.strimzi.io/v1beta2                      true         KafkaConnector
kafkaconnects                         kc                  kafka.strimzi.io/v1beta2                      true         KafkaConnect
...

$ kubectl get pods
NAME                                          READY   STATUS      RESTARTS   AGE
my-cluster-entity-operator-b94b44c7-cqqlb     3/3     Running     0          105m
my-cluster-kafka-0                            1/1     Running     0          106m
my-cluster-kafka-1                            1/1     Running     0          106m
my-cluster-kafka-2                            1/1     Running     0          106m
my-cluster-zookeeper-0                        1/1     Running     0          106m
my-cluster-zookeeper-1                        1/1     Running     0          106m
my-cluster-zookeeper-2                        1/1     Running     0          106m

In particular, the custom API resources KafkaConnect and KafkaConnector are available -- these are what we will use to configure Kafka Connect.

Configuring the Kafka Connect infrastructure

We will set up the infrastructure by deploying a KafkaConnect custom resource. As a minimum we need to enable the FileStream plug-in within the infrastructure for this example.

On bare metal we enabled plug-ins by locating the plug-in path in the configuration file, and dropping the necessary JAR files into the specified directory. We can't do this on Kubernetes, because the pod's filesystem is immutable. Instead, we have to instruct the Kafka Connect infrastructure to build an entirely new container image, that contains the basic infrastructure and the JAR file (or files) that comprise the plug-in.

This process is reminiscent of the 'source-to-image' process that was widely used to deploy, for example, applications based on the Wildfly application server. The generated image contains all the same stuff as the base image, and some additional bits.

Here is a suitable YAML file to set up KC with the FileStream plug-in in place (see comments below).

kind: KafkaConnect
apiVersion: kafka.strimzi.io/v1beta2
metadata:
  name: my-connect-cluster
  namespace: kafka
  annotations:
    strimzi.io/use-connector-resources: "true"
spec:
  version: 3.5.0
  replicas: 1
  bootstrapServers: 'my-cluster-kafka-bootstrap:9093'
  tls:
    trustedCertificates:
      - secretName: my-cluster-cluster-ca-cert
        certificate: ca.crt
  config:
    group.id: connect-cluster
    offset.storage.topic: connect-cluster-offsets
    config.storage.topic: connect-cluster-configs
    status.storage.topic: connect-cluster-status
    config.storage.replication.factor: -1
    offset.storage.replication.factor: -1
    status.storage.replication.factor: -1
  build:
    output:
      image: image-registry.openshift-image-registry.svc:5000/kafka/my-connect-cluster:latest
      type: docker
    plugins:
      - artifacts:
        - type: jar
          url: https://repo1.maven.org/maven2/org/apache/kafka/connect-file/3.4.0/connect-file-3.4.0.jar
        name: connect-file

There are a few things to note about this specification. This annotation:

  annotations:
    strimzi.io/use-connector-resources: "true"

prepares the generated KC image to respond to configuration by custom resources, not just the REST API. We can use the REST API on Kubernetes, just as we did on bare metal, but using custom resources is more idiomatic. With this annotation, we will be able to deploy resources of type KafkaConnector (more on this later).

Note:
Watch out for the closeness of the names of the custom resources here: a KafkaConnect defines the KC infrastructure, while a KafkaConnector defines a particular connector within a KC installation.

The build: part of the configuration tells the infrastructure to build a new image and store it in a specific repository. On OpenShift, this can be the built-in image repository, as in my example, but other alternatives are possible. The image will contain the required plug-in, along with all the infrastructure components. I've specified the plug-in JAR as a package in a public Maven repository in this case.

Finally, note that some of the names in the YAML, like my-cluster-kafka-bootstrap match the pre-existing Kafka installation. If you didn't use the stock installation, you'll need to review the YAML to check that everything matches.

To deploy the infrastructure, just run kubectl apply -f on the YAML file. You'll see the builder pod get created, which will have a name like my-connect-cluster-connect-build-1-build. It could take a few minutes for this pod to do its work. When it's finished, you should see this:

$ kubectl get pods
NAME                                          READY   STATUS      RESTARTS   AGE
my-connect-cluster-connect-856f4896c5-k6c5r   1/1     Running     0          1m
my-connect-cluster-connect-build-1-build      0/1     Completed   0          10m

The pod my-connect-cluster-connect-NNNN runs the customized KC infrastructure, containing the FileStream plug-in. Let's log in to this pod and look at the filesystem.

$ kubectl exec my-connect-cluster-connect-856f4896c5-k6c5r -it -- /bin/sh
$ ls -lR /opt/kafka/plugins/
...
/opt/kafka/plugins/connect-file/018a2cb1:
total 16
-rw-r--r--. 1 root root 15749 Dec 18 14:16 018a2cb1.jar

The file 018a2cb1.jar is derived from the original connect-file-3.4.0.jar specified in the customer resource. If you run unzip -l on it, you'll see the classes that make up the FileStream plug-in.

The command that actually runs the KC infrastructure is

/opt/kafka/bin/connect-distributed.sh /tmp/strimzi-connect.properties

If you look at the automatically-generated configuration file /tmp/strimzi-connect.properties you'll see that it sets plugin.path to /opt/kafka/plug-ins, which is where the plugins we specified in the YAML file have ended up (albeit with different names).

Note:
Leave a session logged into this pod -- we'll need to watch the output file of the FileStream plug-in, so see that the plug-in is working.

This method of specifying which plug-ins to use is complicated, when all we want to do is run the FileStream plug-in. However, it doesn't get any more complicated in 'real' installations. If we were running Debezium, for example, for change-data capture, we would need to include database drivers as well as the Debezium plug-in, and maybe supporting JARs as well. The method for doing this is exactly as we used in this example, but with different JAR files.

Running the connector

So, all being well, we should have the KC infrastructure running in a single Kubernetes pod, which is hosting on an image that contains the plug-in we want to use. We now have to create a specific connector that uses this plug-in.

In Part I I showed how to use the KC REST API, to send a JSON file that contained the connector specification. That method will work here as well, but we don't need to: we can use KafkaConnector customer resources. Here is an example, that uses the FileStream plug-in we installed earlier.

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: my-file-sink
  labels:
    strimzi.io/cluster: my-connect-cluster
spec:
  class: org.apache.kafka.connect.file.FileStreamSinkConnector
  tasksMax: 1
  config:
    file: "/tmp/foo.sink.txt"
    topics: "foo"
    value.converter: "org.apache.kafka.connect.storage.StringConverter"

The resource provides the name of the connector, and the Kafka Connect cluster it will be assigned to (my-connect-cluster). It indicates the class name of the plug-in, and some configuration for that plug-in. In this case, the configuration says to read from the Kafka topic foo, and write to the file /tmp/foo.sink.txt. The value.converter setting says that the messages should be interpreted as plain text strings -- the default would be JSON files. You could leave the default, of course, if the messages did actually contain JSON files.

Install the connector by running kubectl apply -f on the YAML file. This won't create any new pods -- it will just add a task to the connector pod's workload. We can check the status of the connector like this:

$ kubectl get kafkaconnectors
NAME           CLUSTER              CONNECTOR CLASS                                         MAX TASKS   READY
my-file-sink   my-connect-cluster   org.apache.kafka.connect.file.FileStreamSinkConnector   1           True
Note:
All the usual ways of managing custom resources will work on KC connectors as well as get. In practice, these Kubernetes API invocations get transformed into operations on the REST API. Probably that's a detail you'll only notice if there are errors, when the REST URL might appear in the log file of the KC pod.

Testing the connector

Log into the Kafka Connect pod. You should see that the connector's output file has been created:

$ ls /tmp
foo.sink.txt ...

$ tail -f /tmp/foo.sink.txt

The tail -f command will display any new data that gets written to the file.

To test the connector, we need to write messages to the topic foo. There are many ways to do that, but the easiest -- for testing purposes -- is just to log into one of the Kafka broker pods, and run kafka-console-producer.sh.

$ kubectl exec my-cluster-kafka-0 -it -- /bin/sh
sh-4.4$ ./bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic foo
> Hello, World!

All being well, you should see the message you enter at the prompt appear in the connector's output file.

Using the REST API

The REST API that I outlined in Part I should still work, although you'll probably need to log into a pod to invoke it. Unless you changed it at installation time, the REST API port will be 8083, as in the bare metal installation. So, to invoke the connectors API and get a list of installed connectors, we can log into the KC pod and run this:

$ curl `hostname`:8083/connectors
...
["my-file-sink"]
Note:
You probably won't be able to use localhost here -- in recent Strimzi versions the API port does not bind to the loopback address.

While the REST API works, you should bear in mind that this API is also used by the KC infrastructure for Strimzi integration. It's probably best not to mix REST-based administration with the use of KafkaConnector custom resources.

Closing remarks

Running Kafka Connect in a container environment is conceptually the same as running it on bare metal. We still run the KC framework with particular plug-ins enabled; we can still use the REST API to configure it.

The additional complexity of using Strimzi stems from the 'cattle, not pets' philosophy of Kubernetes. We can't just install plug-ins by copying JAR files into the container, because the container filesystem is immutable. On the positive side, the complexity does not get any worse -- on the conceptual level -- when we use KC for real applications (like Debezium). On a practical level, however, we still have to make the necessary JARs available in repositories where the KC build framework can find them. It can, frankly, all be a bit tedious.