Capturing database change records using Debezium and Camel
The notion of "change data capture" (CDC) is becoming increasingly significant, in an IT industry that stores and manages an ever-increasing volume of data. Many businesses rely on assemblies of relational databases, often from different vendors, handling different kinds of data. A unified framework for collecting and processing real-time change records from databases is therefore likely to be useful.
Some database vendors have proprietary CDC frameworks, like Oracle's GoldenGate. Unfortunately, as well as being single-vendor, these frameworks often do not work well with open-source integration tools.
This article describes how to use Debezium -- an open-source CDC framework -- with Apache Camel to collect and process change records from a database. The example uses PostgreSQL, because it is widely available and relatively easy to configure in a way that is compatible with Debezium. However, other databases are supported, and will integrate with Camel in the same way.
The article, and the accompanying source code, presents a trivially-simple example of using Debezium with Camel -- it's the "Hello, World" of this kind of application. I'm quietly assuming a Linux environment but, in fact, nothing in the example is really Linux-specific.
About the technologies
Debezium
Debezium is a Java-based framework for extracting change records from various relational databases, and making them available to applications. Debezium is often mentioned in the same breath as Apache Kafka and, indeed, these technologies are often used together. It makes sense to use Debezium to capture change records and publish them to Kafka, where they can be distributed to other application components for processing.
It's also common to see Debezium and Kafka used in containers based on Kubernetes or OpenShift. Again, this is a reasonable method of deployment. However, this article describes the use of Debezium as a data source for Camel. The simple application need not be deployed in a container, and it doesn't use Kafka. It just uses Camel's Debezium support, in a stand-alone Java application.
At the time of writing, Debezium has full support for PostgreSQL, MongoDB, MySQL, and Microsoft SQLServer. There is also preliminary support for Oracle, but licensing constraints make it unclear whether there is a strong future for Oracle integration.
To build Debezium support into a Java application, we need to include
both the core components, and the specific database connector. For
a Maven build, using PostgreSQL, we might use the following in
pom.xml
:
<dependency> <groupId>io.debezium</groupId> <artifactId>debezium-connector-postgres</artifactId> <version>${debezium.version}</version> </dependency> <dependency> <groupId>io.debezium</groupId> <artifactId>debezium-embedded</artifactId> <version>${debezium.version}</version> </dependency> <dependency> <groupId>io.debezium</groupId> <artifactId>debezium-core</artifactId> <version>${debezium.version}</version> </dependency>
Apache Camel
Camel is an open-source, Java-based integration framework. A Camel application consists of one or more routes, each of which consumes data from, and sends data to, one or more endpoints. Camel routes can be specified in Java, naturally enough, but also in a number of other formats, including XML and YAML.
Camel endpoints are defined as URIs, where the scheme part of the URI specifies a particular protocol for consuming or producing data. Each of these endpoints is supported by a particular component or library.
The skeleton of the Camel application described in this article looks like this:
CamelContext camelContext = new DefaultCamelContext(); camelContext.addRoutes (new RouteBuilder() { public void configure() { from ("debezium-postgres:my_database?...") .to ("log://foo"); } }
That is, it defines one route that consumes data from a
debezium-postgres
endpoint, and produces it to a
log
endpoint. Note that the format of the data that is
consumed by Debezium is completely opaque here -- which is not
a problem in this simple application, because all we're doing is
logging it.
To use Camel in a Java application, we need to include libraries
for the main Camel framework, and also for each of the endpoints
in use. In a Maven pom.xml
for this example we
need:
<dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-core-engine</artifactId> </dependency> <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-log</artifactId> </dependency>
We'll also need support for the Debezium endpoint, as described later.
Camel's Debezium support
Debezium support is relatively new in Camel -- since version
3.0. There are (at the time of writing) specific Camel endpoints
for the four database types that Debezium fully supports:
debezium-postgres
, debezium-mongodb
,
debezium-mysql
, and debezium-sqlserver
.
The four different endpoints are configured in broadly the same way, and present data to Camel in the same format.
To include support for the Camel debezium-postgres
endpoint we need the following dependency:
<dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-debezium-postgres</artifactId> </dependency>
How Debezium works with PostgreSQL (and others, to some extent)
On the whole, relational databases do not expose APIs for change data capture -- or, if they do, it is for proprietary implementations. The way the Debezium PostgreSQL connector works is to hook into the database's replication infrastructure.
Replication in PostgreSQL -- and elsewhere -- works by copying the write-ahead log (WAL) of the primary database instance. The WAL is a kind of transaction log, that records each interaction between the database and its clients. It is only the WAL that needs to be flushed to disk before a transaction can be considered complete -- although other database files will need to be updated, these updates need not be immediate, so they can be batched. This approach works because, in the event of a failure, the state of all database files can be reconstructed from the WAL.
So when a PostgreSQL database is replicated, it's only the WAL that has to be transferred from the primary to the backup instances.
The Debezium connector for PostgreSQL connects to the database as it it were a replication client. It receives updates to the WAL just as a replication client would, and these records are parsed to determine the kind of change that occurred.
There isn't space in this article to describe PostgreSQL configuration in detail. In outline, to use Debezium we must (a) enable the WAL for replication, (b) create a user with access rights to act as a replication client, and (c) enable network access for this user.
WAL configuration is in the file postgresql.conf
(usually in /var/lib/pgsql/data/
). The following
settings are probably satisfactory for testing purposes:
wal_level = logical max_wal_senders = 4 max_replication_slots = 4
To act as a replication client, Debezium will need to be able
to connect to the PostgreSQL database as a user with elevated
privileges -- typically the SUPERUSER
role. This
user may, or may not, be the user that owns the tables whose
changes are to be captured. In practice, Debezium will probably
connect to the database as an administrative user.
Debezium will make a network connection to PostgreSQL, even if
it is running on the same host as the database. PostgreSQL has
a mechanism to
use Unix sockets, rather than IP, in this local mode of interaction,
but Debezium won't use it. This means that the user which Debezium
supplies for the database connection must be configured to use the "ident"
authentication method, not the "peer" mode of authentication normally
used for local users.
Authentication settings are set in the file pg_hba.conf
.
For my simple tests I am using a local PostgreSQL installation,
and I've created two users: an unprivileged user kevin
that will own the database to be monitored, and a privileged user
admin
that will own no databases, but has the
SUPERUSER
role. It is this admin
user
that Debezium will use to connect to the database.
With this simple configuration, my pg_hba.conf
looks
like this:
# THESE ARE NOT PRODUCTION SETTINGS!! local replication all peer host replication all 127.0.0.1/32 ident host replication all ::1/128 ident local all admin trust host all admin 127.0.0.1/32 trust host all admin ::1/128 trust local all kevin trust host all kevin 127.0.0.1/32 trust host all kevin ::1/128 trust
I've noticed that the standard package install of PostgreSQL on
Red Hat/Fedora Linux systems does not include the oidentd
authentication daemon that PostgreSQL uses for network authentication.
It can be installed using
# yum install oidentd # systemctl start oidentd
If the user that Debezium uses to connect to the database does not have the elevated privileges needed to act as a replication client, you'll see an exception like this:
org.postgresql.util.PSQLException: FATAL: must be superuser or replication role to start walsender
To test Debezium, the database must have at least one table. For the
purposes of this simple example, it doesn't matter (much) what the
structure of the table is, because all the application does is
log changes. I'm using the user kevin
as the owner
of the database, so I create the table like this:
$ psql -h localhost -U kevin kevin=> create table test (id varchar primary key, foo varchar);When I say it doesn't matter much what structure the table has, I should point out that there is at least one requirement: the table must have a primary key. The key is used to coordinate replication events, and (unless specific extra steps are taken) replication won't work without it. If there is no primary key, then an attempt to update a database table will fail with this error message:
ERROR: cannot update table "test" because it does not have a
replica identity and publishes updates
A sample application
The full source code of this simple application is in my GitHub repository. In this article, I will only describe the Camel route. I've already explained the skeleton of the route; here is the endpoint definition in detail.from ("debezium-postgres:my_postgres_endpoint?" + "databaseHostname=localhost" + "&databasePort=5432" + "&databaseUser=admin" + "&databasePassword=admin" + "&databaseDbname=kevin" + "&databaseServerName=localhost" + "&schemaWhitelist=public" + "&tableWhitelist=public.test" + "&offsetStorageFileName=/tmp/offset.dat" + "&offsetFlushIntervalMs=10000" + "&pluginName=pgoutput") .to ("log://foo"); // Just log each message.
In the full source code, all these endpoint settings are in a properties file, for ease of configuration; I'm showing literal values above for ease of explanation.
As the Camel documentation delights in telling us,
the debezium-postgres
endpoint has 84 options. We won't
need all of them in this simple example, but there are some subtle
configuration elements that make it worth reading the documentation
in detail.
Although it's common for Camel endpoints to take a hostname as part
of the main URI specification (as with HTTP URIs), the Debezium
endpoints don't work that way. In the URI
debezium-postgres:my_postgres_endpoint...
, the
identifier my_postgres_endpoint
does not define
a hostname -- it is an internal name that identifies the endpoint in
stored state (see below).
The actual database connection parameters are databaseHostname
and databasePort
.
The parameter databaseServerName
, although mandatory, is
not a server hostname. This parameter is used to identify a particular Debezium
configuration in stored state (see below).
databaseDbname
is the name of the
PostgreSQL database to monitor; that it has the same name as a database
user in this example is merely coincidence.
schemaWhitelist
and tableWhitelist
, if provided,
filter the list of tables to be monitored. Without these parameters,
Debezium is able to get a list of tables in the specified database, and
monitor them all (although, of course, it is the WAL that is being
monitored, rather than the tables themselves). There are corresponding
'blacklist' options for situations where it's easier to select tables
to exclude, rather than to include.
offsetStorageFileName
is a file in which the Camel Debezium
endpoint stores its state. The word 'offset' here refers to an offset in the
database server's write-ahead log (WAL). This information needs to
be stored because,
without it, if the application were restarted it would potentially
reprocess the entire WAL. The Debezium endpoint has various built-in
methods for storing state, including in Kafka topics. Alternatively,
you can provide a Java class with a custom implementation.
offsetFlushIntervalMs
is the time in milliseconds between
repeated storage of the endpoint's state. There are various
other parameters that
control behaviour if the state can't be stored, but these are
unlikely to be relevant for simple file-based storage.
pluginName
identifies the specific method that will be used
to stream replication data from the server's WAL; that is, it identifies
a particular format for change records on the wire. PostgresSQL supports
a number of such formats; pgoutput
is the one that it uses
internally for replication, so doesn't require the installation of any
additional dependencies. The differences between these different
'plugin' settings are quite subtle; they affect, for example, how
database data types are mapped onto Java data types. This can be important,
in particular, if you're handling numeric data.
Running the application
The simple Camel application produces no output -- apart from normal internal logging -- until Debezium detects a change to the specified database table. Making the following database changes, for example:
$ psql -h localhost -U kevin kevin=> insert into test values ('a1', 'hello'); INSERT 0 1 kevin=> update test set foo='world' where id='a1'; UPDATE 1
results in this output from the program:
020-10-07 14:37:48,194 [ebeziumConsumer] INFO foo - Exchange[ExchangePattern: InOnly, BodyType: org.apache.kafka.connect.data.Struct, Body: Struct{id=a1,foo=hello}]
2020-10-07 14:38:32,181 [ebeziumConsumer] INFO foo - Exchange[ExchangePattern: InOnly, BodyType: org.apache.kafka.connect.data.Struct, Body: Struct{id=a1,foo=world}]
Processing the change records
Each change record is delivered to the Camel route as a single Camel
Exchange
(essentially, a message). The message body
will be an object of type
org.apache.kafka.connect.data.Struct
. The Struct
class has a number of methods for extracting the individual database
columns, and identifying the type of change that occurred. Processing
this information isn't the responsibility of Debezium -- you can use
whatever facilities Camel provides.
You could, for example, define a converter that creates a custom
data-transfer object from the Struct
, and integrate it
into Camel's built-in data conversion framework. You could, if you
wished, use Camel's built-in parallel-processing machinery to
process changes concurrently -- although care needs to be taken here
if the ordering of events is significant. Camel has built-in support
for sending data to message brokers, to webservices, or
even to another database. You could even use Camel's Kafka support
to publish the change records to a Kafka topic. However, the Kafka Connect
framework might provide a more elegant way of using Kafka with Debezium.