Using the rdkafka library to integrate a C program with Kafka messaging
The Kafka messaging platform is mostly associated with Java clients, but
users of other languages need not miss all the fun. rdkafka
is
a set of libraries for C and C++ applications, supporting all the most
important features of Kafka, including transactions and encryption.
The library is maintained by Magnus Edenhill and others, and is loosely
associated with Confluent which, of course, is a company that is very
active in Kafka development. The Kafka client-server protocol is
completely proprietary, so a specific library will always be needed
for Kafka clients.
This article provides a brief overview of rdkafka
for
C, based on a trivially simple program that just sends a few
messages to Kafka.
Binary versions of rdkafka
are available for many Linux
versions, Windows, and MacOS. However, it should be possible to build
the library on any POSIX-like platform that has a C compiler. The
source code
is available on GitHub.
In this article I focus on the C library. The C library can be used with C++ programs but, of course, the converse is not true. Owing to the way the C library is implemented, the C and C++ APIs are very similar. The clean-up requirements are perhaps a bit easier to get right in the C++ version but, frankly, there's not a lot else to choose between them.
Note:.
Where I describe building the C program I'm mostly focusing on Unix-like systems. However, the rdkafka API itself is not platform-specific
rdkafka
is
pretty well documented, for an open-source project. However, the
sample programs use extra libraries which make them harder to follow
than they should be -- the C API is actually easier to use than the
examples make clear. In this article, I will describe how to code and
build the simplest Kafka C program I could think of: one that sends a
couple of text messages to Kafka. This example does not support
authentication or encryption, so will probably need to be tested on a
local installation of Kafka. I might write another article covering
these features in due course, if there seems to be enough interest.
I will give the describe source code for the simple program only in outline. The full source code is available from my GitHub repository.
Prerequistes
To follow this article you will need
Some knowledge of Kafka and of C programming
A locally-installed Kafka broker (or a remotely-installed Kafka that does not require authentication -- this article does not cover authentication)
The
rdkafka
library and development headers. To get these for Red Hat/Fedora, rundnf install librdkafka-devel
. See therdkafka
documentation for ideas where to find binaries for other platformsA C compiler
A text editor or IDE
Some way to ensure that the messages sent by the C program are actually being received. For example, you could use the
kafka-console-consumer
utility that is part of KafkaAbout 30 minutes, if you already have Kafka set up
Overview of the rdkafka C library
rdkafka
for C is implemented in a kind of object-oriented
C, a popular approach in modern C libraries. Data structures are
used to represent classes, each of which has a 'constructor' and a
'destructor'. For example, we use the rd_kafka_t
object,
which is the main entry point for both message production and consumption,
like this:
rd_kafka_t *producer = rd_kafka_new (...) // Create the object if (producer) { rd_kafka_destroy (producer); // Call the destructor }
"Methods" on these class-like structures are named after the
class itself, and take a reference to the object as the first
paramter (which is what happens in C++, but invisibly).
So to use the producer
object to send a message,
we can do:
if (producer) { rd_kafka_producev (producer, ....) }
We can think of this usage as calling the "method" producev()
on the "class" rd_kafka
. When I refer to "methods" in this
article, this what I have in mind. Of course, these are
really just C functions with a particular usage convention.
Care needs to be taken with the clean-up of the "objects" allocated using
the API. In general, every object that is created needs to be cleaned
up; but some objects take ownership of other objects that are
passed to them. When that happens, the "owner" object will clean up
the dependent object when it, itself, is cleaned up. Trying to clean up
these dependent objects again will likely cause a crash. The
rdkafka
documentation is reasonably clear about the
situations where this happens, and needs to be read carefully.
As with any C program, it's well worth testing with a memory checker like
valgrind
, because it's all too easy to miss some aspect of
clean-up, and end up with a nasty memory leak.
Initialization
Initialization is done in rdkafka
in a very similar way to
the Java client. Producers and consumers have their own initialization,
based on name-value pairs. In rdkafka
we create
an object of type rd_kafka_conf_t
, and initialize it with
specific values, like this:
rd_kafka_conf_t *conf = rd_kafka_conf_new(); rd_kafka_conf_set (conf, "bootstrap.servers", "localhost:9092", ...);
Note:
The ellipses [...] in these listings indicate places where I've left out code that is not necessary for the explanation; in this case it is related to error handling. Of course, the full source code does not omit these elements.
Note:
"bootstrap.servers" is the only compulsory configuration setting. However, it's advisable to allow users to set other values, perhaps from a configuration file, as the exact settings for best performance often cannot be known at compilation time.
The conf
object is then passed as an argument when creating
the rd_kafka
object for producing messages.
Sending a message
There are various ways to send a message. The simplest is probably to call
rd_kafka_producev()
which takes a topic name, a key, and
a value. This method will return very quickly because, in fact, it does
not wait for the message to be sent. The actual send happens asynchronously.
To wait for all pending sends to complete, we can do this:
rd_kafka_flush (producer, 1000); // milliseconds
If there is a problem communicating with the Kafka server, most likely it is at this point it will become obvious -- messages will remain queued and never get sent. We can check for this kind of error by testing the producer's queue length which, after seconds, ought to be zero:
int unsent = rd_kafka_outq_len (producer); if (unsent > 0) { // Handle error -- some messages are unsent }
Error handling and logging
The rdkafka
API uses a number of different error-handling
mechanisms. First, there is the traditional return-value method that
is widespread in C programming. With this approach, a functional call
returns zero if it succeeds, and a number that indicates the cause
of the error if it does not. The
rdkafka_producev()
method, for example, works this way.
The function rd_kafka_err2str()
provides a textual representation
of the error code, if necessary.
Some functions provide a textual error message directly. For example, the
rd_kafka_new()
function takes a char *
as
an argument, along with an integer size argument. In the event of error,
the buffer that the char *
references will be filled in
with text, up to the limit set by the length.
Finally, the rdkafka
library logs error conditions
directly to stderr
. A simple program, therefore, could get
along without doing any error reporting at all. Of course, it would
still have to test for errors, and take action accordingly. In a more
complex application, you'd probably want to capture the rdkafka
error messages, and combine them with other logging into a single
reporting mechanism. The rd_kafka_conf_set_log_cb()
sets a
new handler for library-generated log messages.
Compiling the simple C program
All that needs to be done, apart from the usual compilation steps, is
to link wth the rdkafka
library. On Linux, for example:
$ gcc -o myprog myprog.c -lrdkafka
Testing
My test application writes to a particular topic on a particular
Kafka installation. To run it, you'd first need to edit the soure code
to define the Kafka bootstrap URI and the topic. Then you would need
some way to check that messages are actually being produced correctly.
For example, you could log into one of the Kafka broker hosts and
run kafka-console-consumer.sh
directly, like this:
$ /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server :9092 --topic my-topic Hello World ...
rdkafka licensing
rdkafka
is an open-source library, made from components maintained
by different contributors. Its licence is a little more complex than will
be found in a project that is covered by a single, global licence like
GPL. All the components are licensed to allow distribution in source and
binary form, so in most cases this slight additional complexity won't
be a problem. However, it's probably advisable to read the various
licence documents in the source code, if you're planning a substantial
project based on rdkafka
.
Serializers and compatibility
If you're familiar with the Kafka client library for Java, you might be
struck by the lack of mention of serializers in rdkafka
.
Fundamentally, Kafka keys and values are just sequences of bytes --
Kafka does not impose any particular structure on them. It's up to the
clients to agree on how these bytes should be interpreted. This can be
problematic when the Kafka clients are based on different programming
platforms.
Java requires serializers because the internal representation of data
types in the JVM is opaque. For example, Java stores String
objects in a format that is based on UTF-16 encoding but, in fact, this
format is never directly exposed. To store a Java string in some other
location, external to the JVM, requires it to be converted to some
known format. So when we use the StringSerializer
class in
a Java Kafka application, we're actually converting Java strings to
some other format. As it happens, this format is UTF-8. I stress that
this UTF-8 convention is imposed by the client
-- Kafka does
not care how data is represented.
The simple text program I have used for this article writes a string in, essentially, ASCII format. ASCII is a subset of UTF-8 and, therefore, a Java-based Kafka application would read the message payloads sent by the C program correctly. UTF-8 is a good choice for a string encoding, because it is so widely supported. However, I want to reiterate that Kafka itself imposes no structure on the data it carries.
Java applications very often use Apache Avro
to serialize complex data structures. Avro is widely used with Kafka,
and the Kafka client library for Java has built-in support for this.
An Avro library is also available for C, although it does not yet support
the same range of platforms that rdkafka
does. The
problem is that C data types are not opaque, as Java's are: C developers
are used to working with raw data in memory. This means that
conversion from one platform to another, or even from one C compiler to
another on the same platform, is often problematic.
In short: unless your application is developed to work with raw bytes, you'll have to be careful to ensure that your Java and C applications can use a common format for data representation. XML encoded with UTF-8 is a pretty safe choice, but it's a lot less efficient than Avro's binary encoding.
Closing remarks
If you're familiar with programming for Kafka in Java, you'll find few
surprises in rdkafka
-- except, perhaps, the absence of
serializers. Even without that experience, rdkafka
is
pretty easy to use -- at least for simple applications.