Capturing database change records using Debezium and Camel

Debezium logo The notion of "change data capture" (CDC) is becoming increasingly significant, in an IT industry that stores and manages an ever-increasing volume of data. Many businesses rely on assemblies of relational databases, often from different vendors, handling different kinds of data. A unified framework for collecting and processing real-time change records from databases is therefore likely to be useful.

Some database vendors have proprietary CDC frameworks, like Oracle's GoldenGate. Unfortunately, as well as being single-vendor, these frameworks often do not work well with open-source integration tools.

This article describes how to use Debezium -- an open-source CDC framework -- with Apache Camel to collect and process change records from a database. The example uses PostgreSQL, because it is widely available and relatively easy to configure in a way that is compatible with Debezium. However, other databases are supported, and will integrate with Camel in the same way.

The article, and the accompanying source code, presents a trivially-simple example of using Debezium with Camel -- it's the "Hello, World" of this kind of application. I'm quietly assuming a Linux environment but, in fact, nothing in the example is really Linux-specific.

About the technologies

Debezium

Debezium is a Java-based framework for extracting change records from various relational databases, and making them available to applications. Debezium is often mentioned in the same breath as Apache Kafka and, indeed, these technologies are often used together. It makes sense to use Debezium to capture change records and publish them to Kafka, where they can be distributed to other application components for processing.

It's also common to see Debezium and Kafka used in containers based on Kubernetes or OpenShift. Again, this is a reasonable method of deployment. However, this article describes the use of Debezium as a data source for Camel. The simple application need not be deployed in a container, and it doesn't use Kafka. It just uses Camel's Debezium support, in a stand-alone Java application.

At the time of writing, Debezium has full support for PostgreSQL, MongoDB, MySQL, and Microsoft SQLServer. There is also preliminary support for Oracle, but licensing constraints make it unclear whether there is a strong future for Oracle integration.

To build Debezium support into a Java application, we need to include both the core components, and the specific database connector. For a Maven build, using PostgreSQL, we might use the following in pom.xml:

<dependency>
  <groupId>io.debezium</groupId>
  <artifactId>debezium-connector-postgres</artifactId>
  <version>${debezium.version}</version>
</dependency>
<dependency>
  <groupId>io.debezium</groupId>
  <artifactId>debezium-embedded</artifactId>
  <version>${debezium.version}</version>
</dependency>
<dependency>
  <groupId>io.debezium</groupId>
  <artifactId>debezium-core</artifactId>
  <version>${debezium.version}</version>
</dependency>

Apache Camel

Camel is an open-source, Java-based integration framework. A Camel application consists of one or more routes, each of which consumes data from, and sends data to, one or more endpoints. Camel routes can be specified in Java, naturally enough, but also in a number of other formats, including XML and YAML.

Camel endpoints are defined as URIs, where the scheme part of the URI specifies a particular protocol for consuming or producing data. Each of these endpoints is supported by a particular component or library.

The skeleton of the Camel application described in this article looks like this:

CamelContext camelContext = new DefaultCamelContext();
camelContext.addRoutes (new RouteBuilder()
  {
  public void configure()
    {
    from ("debezium-postgres:my_database?...")
      .to ("log://foo");
    }
  }

That is, it defines one route that consumes data from a debezium-postgres endpoint, and produces it to a log endpoint. Note that the format of the data that is consumed by Debezium is completely opaque here -- which is not a problem in this simple application, because all we're doing is logging it.

To use Camel in a Java application, we need to include libraries for the main Camel framework, and also for each of the endpoints in use. In a Maven pom.xml for this example we need:

<dependency>
  <groupId>org.apache.camel</groupId>
  <artifactId>camel-core-engine</artifactId>
</dependency>
<dependency>
  <groupId>org.apache.camel</groupId>
  <artifactId>camel-log</artifactId>
</dependency>

We'll also need support for the Debezium endpoint, as described later.

Camel's Debezium support

Debezium support is relatively new in Camel -- since version 3.0. There are (at the time of writing) specific Camel endpoints for the four database types that Debezium fully supports: debezium-postgres, debezium-mongodb, debezium-mysql, and debezium-sqlserver.

The four different endpoints are configured in broadly the same way, and present data to Camel in the same format.

To include support for the Camel debezium-postgres endpoint we need the following dependency:

<dependency>
  <groupId>org.apache.camel</groupId>
  <artifactId>camel-debezium-postgres</artifactId>
</dependency>

How Debezium works with PostgreSQL (and others, to some extent)

On the whole, relational databases do not expose APIs for change data capture -- or, if they do, it is for proprietary implementations. The way the Debezium PostgreSQL connector works is to hook into the database's replication infrastructure.

Replication in PostgreSQL -- and elsewhere -- works by copying the write-ahead log (WAL) of the primary database instance. The WAL is a kind of transaction log, that records each interaction between the database and its clients. It is only the WAL that needs to be flushed to disk before a transaction can be considered complete -- although other database files will need to be updated, these updates need not be immediate, so they can be batched. This approach works because, in the event of a failure, the state of all database files can be reconstructed from the WAL.

So when a PostgreSQL database is replicated, it's only the WAL that has to be transferred from the primary to the backup instances.

The Debezium connector for PostgreSQL connects to the database as it it were a replication client. It receives updates to the WAL just as a replication client would, and these records are parsed to determine the kind of change that occurred.

There isn't space in this article to describe PostgreSQL configuration in detail. In outline, to use Debezium we must (a) enable the WAL for replication, (b) create a user with access rights to act as a replication client, and (c) enable network access for this user.

WAL configuration is in the file postgresql.conf (usually in /var/lib/pgsql/data/). The following settings are probably satisfactory for testing purposes:

wal_level = logical
max_wal_senders = 4
max_replication_slots = 4

To act as a replication client, Debezium will need to be able to connect to the PostgreSQL database as a user with elevated privileges -- typically the SUPERUSER role. This user may, or may not, be the user that owns the tables whose changes are to be captured. In practice, Debezium will probably connect to the database as an administrative user.

Debezium will make a network connection to PostgreSQL, even if it is running on the same host as the database. PostgreSQL has a mechanism to use Unix sockets, rather than IP, in this local mode of interaction, but Debezium won't use it. This means that the user which Debezium supplies for the database connection must be configured to use the "ident" authentication method, not the "peer" mode of authentication normally used for local users. Authentication settings are set in the file pg_hba.conf.

For my simple tests I am using a local PostgreSQL installation, and I've created two users: an unprivileged user kevin that will own the database to be monitored, and a privileged user admin that will own no databases, but has the SUPERUSER role. It is this admin user that Debezium will use to connect to the database.

With this simple configuration, my pg_hba.conf looks like this:

# THESE ARE NOT PRODUCTION SETTINGS!!
local  replication   all                     peer
host   replication   all       127.0.0.1/32  ident
host   replication   all       ::1/128       ident

local  all           admin                   trust
host   all           admin     127.0.0.1/32  trust
host   all           admin     ::1/128       trust

local  all           kevin                   trust
host   all           kevin     127.0.0.1/32  trust
host   all           kevin     ::1/128       trust

I've noticed that the standard package install of PostgreSQL on Red Hat/Fedora Linux systems does not include the oidentd authentication daemon that PostgreSQL uses for network authentication. It can be installed using

# yum install oidentd
# systemctl start oidentd

If the user that Debezium uses to connect to the database does not have the elevated privileges needed to act as a replication client, you'll see an exception like this:

org.postgresql.util.PSQLException: FATAL: must be superuser or replication role to start walsender

To test Debezium, the database must have at least one table. For the purposes of this simple example, it doesn't matter (much) what the structure of the table is, because all the application does is log changes. I'm using the user kevin as the owner of the database, so I create the table like this:

$ psql -h localhost -U kevin
kevin=> create table test (id varchar primary key, foo varchar);
When I say it doesn't matter much what structure the table has, I should point out that there is at least one requirement: the table must have a primary key. The key is used to coordinate replication events, and (unless specific extra steps are taken) replication won't work without it. If there is no primary key, then an attempt to update a database table will fail with this error message:

ERROR: cannot update table "test" because it does not have a replica identity and publishes updates

A sample application

The full source code of this simple application is in my GitHub repository. In this article, I will only describe the Camel route. I've already explained the skeleton of the route; here is the endpoint definition in detail.
  from ("debezium-postgres:my_postgres_endpoint?"
    + "databaseHostname=localhost"
    + "&databasePort=5432"
    + "&databaseUser=admin"
    + "&databasePassword=admin"
    + "&databaseDbname=kevin"
    + "&databaseServerName=localhost"
    + "&schemaWhitelist=public"
    + "&tableWhitelist=public.test"
    + "&offsetStorageFileName=/tmp/offset.dat"
    + "&offsetFlushIntervalMs=10000"
    + "&pluginName=pgoutput")
          .to ("log://foo"); // Just log each message.

In the full source code, all these endpoint settings are in a properties file, for ease of configuration; I'm showing literal values above for ease of explanation.

As the Camel documentation delights in telling us, the debezium-postgres endpoint has 84 options. We won't need all of them in this simple example, but there are some subtle configuration elements that make it worth reading the documentation in detail.

Although it's common for Camel endpoints to take a hostname as part of the main URI specification (as with HTTP URIs), the Debezium endpoints don't work that way. In the URI debezium-postgres:my_postgres_endpoint..., the identifier my_postgres_endpoint does not define a hostname -- it is an internal name that identifies the endpoint in stored state (see below).

The actual database connection parameters are databaseHostname and databasePort. The parameter databaseServerName, although mandatory, is not a server hostname. This parameter is used to identify a particular Debezium configuration in stored state (see below).

databaseDbname is the name of the PostgreSQL database to monitor; that it has the same name as a database user in this example is merely coincidence.

schemaWhitelist and tableWhitelist, if provided, filter the list of tables to be monitored. Without these parameters, Debezium is able to get a list of tables in the specified database, and monitor them all (although, of course, it is the WAL that is being monitored, rather than the tables themselves). There are corresponding 'blacklist' options for situations where it's easier to select tables to exclude, rather than to include.

offsetStorageFileName is a file in which the Camel Debezium endpoint stores its state. The word 'offset' here refers to an offset in the database server's write-ahead log (WAL). This information needs to be stored because, without it, if the application were restarted it would potentially reprocess the entire WAL. The Debezium endpoint has various built-in methods for storing state, including in Kafka topics. Alternatively, you can provide a Java class with a custom implementation.

offsetFlushIntervalMs is the time in milliseconds between repeated storage of the endpoint's state. There are various other parameters that control behaviour if the state can't be stored, but these are unlikely to be relevant for simple file-based storage.

pluginName identifies the specific method that will be used to stream replication data from the server's WAL; that is, it identifies a particular format for change records on the wire. PostgresSQL supports a number of such formats; pgoutput is the one that it uses internally for replication, so doesn't require the installation of any additional dependencies. The differences between these different 'plugin' settings are quite subtle; they affect, for example, how database data types are mapped onto Java data types. This can be important, in particular, if you're handling numeric data.

Running the application

The simple Camel application produces no output -- apart from normal internal logging -- until Debezium detects a change to the specified database table. Making the following database changes, for example:

$ psql -h localhost -U kevin
kevin=> insert into test values ('a1', 'hello');
INSERT 0 1
kevin=> update test set foo='world' where id='a1';
UPDATE 1

results in this output from the program:

020-10-07 14:37:48,194 [ebeziumConsumer] INFO foo - Exchange[ExchangePattern: InOnly, BodyType: org.apache.kafka.connect.data.Struct, Body: Struct{id=a1,foo=hello}]
2020-10-07 14:38:32,181 [ebeziumConsumer] INFO foo - Exchange[ExchangePattern: InOnly, BodyType: org.apache.kafka.connect.data.Struct, Body: Struct{id=a1,foo=world}]

Processing the change records

Each change record is delivered to the Camel route as a single Camel Exchange (essentially, a message). The message body will be an object of type org.apache.kafka.connect.data.Struct. The Struct class has a number of methods for extracting the individual database columns, and identifying the type of change that occurred. Processing this information isn't the responsibility of Debezium -- you can use whatever facilities Camel provides.

You could, for example, define a converter that creates a custom data-transfer object from the Struct, and integrate it into Camel's built-in data conversion framework. You could, if you wished, use Camel's built-in parallel-processing machinery to process changes concurrently -- although care needs to be taken here if the ordering of events is significant. Camel has built-in support for sending data to message brokers, to webservices, or even to another database. You could even use Camel's Kafka support to publish the change records to a Kafka topic. However, the Kafka Connect framework might provide a more elegant way of using Kafka with Debezium.