Using the rdkafka library to integrate a C program with Kafka messaging

Kafka logo The Kafka messaging platform is mostly associated with Java clients, but users of other languages need not miss all the fun. rdkafka is a set of libraries for C and C++ applications, supporting all the most important features of Kafka, including transactions and encryption. The library is maintained by Magnus Edenhill and others, and is loosely associated with Confluent which, of course, is a company that is very active in Kafka development. The Kafka client-server protocol is completely proprietary, so a specific library will always be needed for Kafka clients.

This article provides a brief overview of rdkafka for C, based on a trivially simple program that just sends a few messages to Kafka.

Binary versions of rdkafka are available for many Linux versions, Windows, and MacOS. However, it should be possible to build the library on any POSIX-like platform that has a C compiler. The source code is available on GitHub.

In this article I focus on the C library. The C library can be used with C++ programs but, of course, the converse is not true. Owing to the way the C library is implemented, the C and C++ APIs are very similar. The clean-up requirements are perhaps a bit easier to get right in the C++ version but, frankly, there's not a lot else to choose between them.

Note:
Where I describe building the C program I'm mostly focusing on Unix-like systems. However, the rdkafka API itself is not platform-specific
.

rdkafka is pretty well documented, for an open-source project. However, the sample programs use extra libraries which make them harder to follow than they should be -- the C API is actually easier to use than the examples make clear. In this article, I will describe how to code and build the simplest Kafka C program I could think of: one that sends a couple of text messages to Kafka. This example does not support authentication or encryption, so will probably need to be tested on a local installation of Kafka. I might write another article covering these features in due course, if there seems to be enough interest.

I will give the describe source code for the simple program only in outline. The full source code is available from my GitHub repository.

Prerequistes

To follow this article you will need

Overview of the rdkafka C library

rdkafka for C is implemented in a kind of object-oriented C, a popular approach in modern C libraries. Data structures are used to represent classes, each of which has a 'constructor' and a 'destructor'. For example, we use the rd_kafka_t object, which is the main entry point for both message production and consumption, like this:

 rd_kafka_t *producer = rd_kafka_new (...) // Create the object
 if (producer)
   {

   rd_kafka_destroy (producer); // Call the destructor
   }

"Methods" on these class-like structures are named after the class itself, and take a reference to the object as the first paramter (which is what happens in C++, but invisibly). So to use the producer object to send a message, we can do:

 if (producer)
   {
   rd_kafka_producev (producer, ....) 
   }

We can think of this usage as calling the "method" producev() on the "class" rd_kafka. When I refer to "methods" in this article, this what I have in mind. Of course, these are really just C functions with a particular usage convention.

Care needs to be taken with the clean-up of the "objects" allocated using the API. In general, every object that is created needs to be cleaned up; but some objects take ownership of other objects that are passed to them. When that happens, the "owner" object will clean up the dependent object when it, itself, is cleaned up. Trying to clean up these dependent objects again will likely cause a crash. The rdkafka documentation is reasonably clear about the situations where this happens, and needs to be read carefully.

As with any C program, it's well worth testing with a memory checker like valgrind, because it's all too easy to miss some aspect of clean-up, and end up with a nasty memory leak.

Initialization

Initialization is done in rdkafka in a very similar way to the Java client. Producers and consumers have their own initialization, based on name-value pairs. In rdkafka we create an object of type rd_kafka_conf_t, and initialize it with specific values, like this:

rd_kafka_conf_t *conf = rd_kafka_conf_new();
rd_kafka_conf_set (conf, "bootstrap.servers", "localhost:9092", ...);
Note:
The ellipses [...] in these listings indicate places where I've left out code that is not necessary for the explanation; in this case it is related to error handling. Of course, the full source code does not omit these elements.
Note:
"bootstrap.servers" is the only compulsory configuration setting. However, it's advisable to allow users to set other values, perhaps from a configuration file, as the exact settings for best performance often cannot be known at compilation time.

The conf object is then passed as an argument when creating the rd_kafka object for producing messages.

Sending a message

There are various ways to send a message. The simplest is probably to call rd_kafka_producev() which takes a topic name, a key, and a value. This method will return very quickly because, in fact, it does not wait for the message to be sent. The actual send happens asynchronously. To wait for all pending sends to complete, we can do this:

rd_kafka_flush (producer, 1000); // milliseconds

If there is a problem communicating with the Kafka server, most likely it is at this point it will become obvious -- messages will remain queued and never get sent. We can check for this kind of error by testing the producer's queue length which, after seconds, ought to be zero:

int unsent = rd_kafka_outq_len (producer);
if (unsent > 0)
  {
  // Handle error -- some messages are unsent
  }

Error handling and logging

The rdkafka API uses a number of different error-handling mechanisms. First, there is the traditional return-value method that is widespread in C programming. With this approach, a functional call returns zero if it succeeds, and a number that indicates the cause of the error if it does not. The rdkafka_producev() method, for example, works this way. The function rd_kafka_err2str() provides a textual representation of the error code, if necessary.

Some functions provide a textual error message directly. For example, the rd_kafka_new() function takes a char * as an argument, along with an integer size argument. In the event of error, the buffer that the char * references will be filled in with text, up to the limit set by the length.

Finally, the rdkafka library logs error conditions directly to stderr. A simple program, therefore, could get along without doing any error reporting at all. Of course, it would still have to test for errors, and take action accordingly. In a more complex application, you'd probably want to capture the rdkafka error messages, and combine them with other logging into a single reporting mechanism. The rd_kafka_conf_set_log_cb() sets a new handler for library-generated log messages.

Compiling the simple C program

All that needs to be done, apart from the usual compilation steps, is to link wth the rdkafka library. On Linux, for example:

$ gcc -o myprog myprog.c -lrdkafka

Testing

My test application writes to a particular topic on a particular Kafka installation. To run it, you'd first need to edit the soure code to define the Kafka bootstrap URI and the topic. Then you would need some way to check that messages are actually being produced correctly. For example, you could log into one of the Kafka broker hosts and run kafka-console-consumer.sh directly, like this:

$ /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server :9092 --topic my-topic
Hello
World
...

rdkafka licensing

rdkafka is an open-source library, made from components maintained by different contributors. Its licence is a little more complex than will be found in a project that is covered by a single, global licence like GPL. All the components are licensed to allow distribution in source and binary form, so in most cases this slight additional complexity won't be a problem. However, it's probably advisable to read the various licence documents in the source code, if you're planning a substantial project based on rdkafka.

Serializers and compatibility

If you're familiar with the Kafka client library for Java, you might be struck by the lack of mention of serializers in rdkafka. Fundamentally, Kafka keys and values are just sequences of bytes -- Kafka does not impose any particular structure on them. It's up to the clients to agree on how these bytes should be interpreted. This can be problematic when the Kafka clients are based on different programming platforms.

Java requires serializers because the internal representation of data types in the JVM is opaque. For example, Java stores String objects in a format that is based on UTF-16 encoding but, in fact, this format is never directly exposed. To store a Java string in some other location, external to the JVM, requires it to be converted to some known format. So when we use the StringSerializer class in a Java Kafka application, we're actually converting Java strings to some other format. As it happens, this format is UTF-8. I stress that this UTF-8 convention is imposed by the client -- Kafka does not care how data is represented.

The simple text program I have used for this article writes a string in, essentially, ASCII format. ASCII is a subset of UTF-8 and, therefore, a Java-based Kafka application would read the message payloads sent by the C program correctly. UTF-8 is a good choice for a string encoding, because it is so widely supported. However, I want to reiterate that Kafka itself imposes no structure on the data it carries.

Java applications very often use Apache Avro to serialize complex data structures. Avro is widely used with Kafka, and the Kafka client library for Java has built-in support for this. An Avro library is also available for C, although it does not yet support the same range of platforms that rdkafka does. The problem is that C data types are not opaque, as Java's are: C developers are used to working with raw data in memory. This means that conversion from one platform to another, or even from one C compiler to another on the same platform, is often problematic.

In short: unless your application is developed to work with raw bytes, you'll have to be careful to ensure that your Java and C applications can use a common format for data representation. XML encoded with UTF-8 is a pretty safe choice, but it's a lot less efficient than Avro's binary encoding.

Closing remarks

If you're familiar with programming for Kafka in Java, you'll find few surprises in rdkafka -- except, perhaps, the absence of serializers. Even without that experience, rdkafka is pretty easy to use -- at least for simple applications.