Debezium and Quarkus: Change Data Capture Patterns to Avoid Dual-Writes Problems

2022-09-17 10:24:13 By : Ms. Dora Zhan

Live Webinar and Q&A - Introducing MARA: A Reference Architecture for Kubernetes Microservices (Live Webinar Sept 29th, 2022) Save Your Seat

Facilitating the Spread of Knowledge and Innovation in Professional Software Development

Jordan Bragg discusses using entry-points, breadth-first scanning, and operation tagging to demystify the domain, see where to dive deeper, and uncover what technical debt may exist.

We are talking with Philip Howes about how to get value from your ML model as fast as possible. We will also talk about how to improve your deployed model, and what tools you can use when setting up ML projects. We conclude by discussing how stake holders should be involved, and what makes up a complete ML team.

In a web-based service, a slowdown in request processing can eventually make your service unavailable. Chances are, not all requests need to be processed right away. Some of them just need an acknowledgement of receipt. Have you ever asked yourself: “Would I benefit from asynchronous processing of requests? If so, how would I make such a change in a live, large-scale mission critical system?”

The panelists discuss teamwork from a variety of perspectives, from a traditional office setting to remote-first to a hybrid one, and how to be successful in a productive hybrid team.

At QCon Plus November 2021, Nora Jones, CEO and founder of Jeli, talked about how to build production readiness reviews (PRR) with emphasis on context and psychological safety. Her talk focused on the particulars of a PRR process that relates to incidents.

Understand the emerging software trends you should pay attention to. Attend in-person on Oct 24-28, 2022.

Make the right decisions by uncovering how senior software developers at early adopter companies are adopting emerging trends. Register Now.

Adopt the right emerging trends to solve your complex engineering challenges. Register Now.

Your monthly guide to all the topics, technologies and techniques that every professional needs to know about. Subscribe for free.

InfoQ Homepage Articles Debezium and Quarkus: Change Data Capture Patterns to Avoid Dual-Writes Problems

Lire ce contenu en français

In part 2 of this series, we learned about the integration between Apache Kafka Streams and Quarkus, where we developed a simple application producing events to a Kafka topic and consuming and processing them in real-time with Kafka Streams.

In that example, we simulated a Movies streaming company. We stored Movies in one Kafka topic and, in another Kafka topic, we held each occurrence when a user stopped watching a movie and captured the time it had been played. We post-processed these events in real-time to count the number of times a movie is played for more than 10 minutes.

The following figure shows the architecture of the application:

But all the information was stored in Kafka topics. But it’s unlikely that this can happen in a real-world project.

Movie information is probably stored in a traditional database; some distributed cache to speed up queries, or movies are indexed in a search engine. But let’s keep things simple; let’s assume that movie information is stored in a database.

This raises the question of how we can maintain the same data in two different systems, in the database as the primary place and in the Kafka movies topic to process data with Kafka Streams.

This article will teach you how to have the same data in different forms correctly.

The first thing that may come to mind to fix this problem is the dual writes approach. It’s a straightforward approach as it’s the responsibility of your application code to maintain data in all the places. For example, an insert of a new Movie should execute an insert to the database and fire an event to the Kafka topic.

In terms of code, this could be something like:

This looks correct, is easy to implement, and works until you start getting some weird issues if you try it. Let’s explore some of them:

If data is persisted in the database but fails when it’s sent to the Kafka topic, you could wrap both operations in a transaction block. This can fix the transaction problem because there will be a rollback in case of an error. You are paying a big price in performance; the bigger the transaction scope, the more time you block the database. This isn’t fixing concurrency.

What happens if two concurrent users want to update the same Movie entry simultaneously? It could happen that the execution of the first request updates the database and sends the event to Kafka altogether, and then the execution of the second request updates the Movie again. In this case, the database and the Kafka topic content are aligned. But what happens if the execution of the first request only persists to the database operation, then the second request runs the persist and the send event to Kafka. After that, the first request sends the event to the Kafka topic. At this time, database Movie data and Kafka topic data diverged to have different values, leading to inconsistencies between data. Of course, you could synchronize the whole method, but this would mean a huge performance loss.

This last problem occurs because of the nature of mixing different systems; a database transaction ensures within its persistence layer but not between systems.

One possible solution to this problem is to use the 2-Phase Commit protocol. Although this could be a good solution, two problems are present:

It is a possible solution, but it’s not a generic solution, and for this specific case, Apache Kafka isn’t supporting distributed transactions, so let’s explore another solution.

Change Data Capture (CDC) is a pattern used to track data that has changed (i.e., new entries added, updated registries, etc.) and trigger an event, making it possible for the application to react to the change.

There are several ways for implementing CDC, for example, using timestamps, versions, or status indicators at the row level, so you periodically check the elements from one specific point (i.e., SELECT all elements WHERE status=not_read). But this approach has the drawback that you are regularly accessing the database for no business purposes or dealing with the deletions of the entries.

Another option is using database triggers, i.e., any change triggers an event and stores it in a specific event table. It works; you can capture any event, but you are still periodically polling the database.

Most databases have a transaction log that records all changes made to the database. Log scanners scan this log and capture any change in a non-intrusive way. The benefits of this approach are:

Log scanners are the best approach, and one of the most popular open-source projects is Debezium.

Debezium is an open-source project for change data capture using the log scanner approach. Start the database, and configure Debezium to consume data from the transaction log of that database. At this point, for every insert, delete, or update committed to the database, Debezium will trigger an event so an application can register to it and react accordingly.

But why do Debezium, CDC, and Kafka help us fix the problem of dual-writes? An Apache Kafka topic is composed of one or more partitions. Each partition orders the events in the arriving order (events are always appended at the end of the partition). So if we want to maintain the order of concurrent operations (to avoid having misplaced data between systems), Kafka's topic resolves this part of the problem.

But of course, we still have the other part, reading from a database in the correct order in the case of concurrent operations. The CDC and log scanner approaches assure that the contents are in the correct order after transaction commitment and are non-intrusive. Debezium makes this possible.

You can operate Debezium in two different ways, and both are valid depending on the use case. These two methods are the Debezium Server or Debezium Engine (embed).

Debezium Server runs Debezium as a Kafka Connect instance. Kafka Connect is a standalone process started by a consumer and/or producer to read data from Kafka. Kafka Connect defines connectors to different data systems and then moves large data sets into and out of Kafka. Since connectors use the Kafka API, they are scalable, fault-tolerant, and with low latency.

Suppose the following example; you want to export content from one Kafka topic to an index engine like ElasticSearch. You have two options:

Debezium does the same, but reads the transaction log from a database and sends the content to a Kafka topic.

One of the great things about Debezium is that it can connect to several databases such as MySQL, MongoDB, PostgreSQL, Oracle DB, SQL Server, DB 2, Cassandra and Vitesse.

The usual way to run Debezium is through Debezium Server, as it’s not intrusive to the application; it’s a service that takes data changes and populates a Kafka topic.

But not all applications require the same level of fault tolerance or scalability offered by Kafka Connect. Also, sometimes the application must capture the data change event but execute some custom logic and not send the change to a messaging system or an unsupported messaging system.

In these cases, a debezium-api module defines a small API to embed the Debezium Engine in the application.

So far, we’ve learned dual writes are something to avoid. The solution uses Change Data Capture to get data directly from the transaction log and push it to a Kafka topic so any other system can consume it in a “transactional” way and order.

If you arrived at this point, you might wonder: “OK nice, I can use CDC to react to data changes, but I’m exposing the internal entity to external systems.” While this  is true, allow me to introduce you to the  Outbox Pattern  to avoid this problem.

The Outbox Pattern provides an outbox table where you record all entities' operations (maybe with denormalized data). Then the CDC system (Debezium in our case) reacts to changes placed  in the outbox table and not the entity table making the data model isolated from other systems:

The important part you need to be aware of is that both entity modifications and the outbox must be within the same transaction.

Let’s start putting all these pieces together in a Quarkus project and fix the problem we introduced at the beginning, how to insert a movie in the database and also populate it into an external system (Kafka topic).

Instead of handcrafting code for each use case, let’s see how to use Debezium Embedded and how it’s integrated with Quarkus to solve this problem.

Navigate to the Quarkus start page and select RestEasy Reactive and RestEasy Reactive Jackson extensions for marshaling/unmarshaling events from/to JSON-Java Object-Byte Array and implement JAX-RS endpoints, Panache and MySQL driver to insert movies into the MySQL database, and the SmallRye Reactive Messaging for interacting with Kafka. Also, uncheck the Started Code generation option.

In the following screenshot, you can see it:

You can skip this manual step and navigate to the Quarkus Generator link, where all the dependencies are selected. Then push the Generate your application button to download the scaffolded application’s zip file.

Unzip the file and open the project in your favorite IDE.

Before we start to code, we need to add two new dependencies: one for using the Debezium Engine and another for adding the Debezium Quarkus Outbox extension.

Open the pom.xml file and add the following dependencies.

This is to use the Debezium Engine embedded in the application. None of these dependencies would be required if we used Debezium Server since it’s a standalone service.

Quarkus integrates with the Outbox Pattern through the Debezium Quarkus Outbox extension. 

Open the pom.xml file and add the following dependencies.

Note that the version of the BOM is aligned with the Quarkus version, 2.10.1.Final in this case.

You can choose not to use the Outbox Pattern or implement it yourself; in this case, none of these dependencies are required. But we’ll use it to simplify the development.

With all these dependencies in place, create the Movie entity annotated with JPA annotations and extend the PanacheEntity class:

The next step is to create an HTTP endpoint to insert the movie content into the database using JAX-RS annotations:

Since we are using the Debezium Quarkus Outbox extension, we need to create an entity representing the content stored in the outbox table. The entity must implement the ExportedEvent interface and implement the required methods to identify the kind of event put in the outbox table.

The last step before adding Debezium logic to the code is to implement the MovieService class with insert logic. This logic should persist the movie into the Movie table and the MovieEvent entity into a table managed by the OutboxEvent table extension.

The extension provides a specific CDI event to persist an event that implements the ExportedEvent interface. The only thing to do is fire an event, and the data is automatically persisted.

The last step is configuring Debezium Engine and to start it embedded within the application. 

To configure the engine, you need to set the database information (hostname, port, credentials), and the database and tables Debezium should monitor to trigger events.

The DebeziumListener CDI class starts Debezium when the application is up and running.

Debezium Engine doesn’t run in a separate thread, so we need to provide a thread to run in parallel, not blocking the application thread. Using the ManagedExecutor is the correct way to provide an executor thread within Quarkus to run Debezium.

Then we need to instantiate the Debezium Engine using the DebeziumEngine class, setting the configuration properties created in the previous step.  One of the most important steps is registering a method triggered every time Debezium generates an event. The notifying method registers this custom method, and in our example, we named it handleChangeEvent.

This method receives the event and we can implement any logic we wish, from sending to a Kafka topic, manipulating and sending it to another service, anything you can implement in Java.

This example is self-contained, so you don’t need to start anything as Quarkus will do it for you.

Panache and Kafka Connector integrate with Quarkus DevServices. For this reason, we don’t need to start a Kafka cluster or a MySQL database nor configure them as Quarkus Dev mode will take care of everything. Remember to have a working container runtime on your computers, such as Podman or any other OCI-compliant tool. 

Before running the application, we’ll add two configuration properties to the application to make things more traceable; in the application.properties file, add the following lines:

The first line logs SQL statements executed to the database. This is useful to validate both tables (Movies and OutboxEvent) when it inserts data.

The second one avoids Debezium deleting data from the outbox table after it’s consumed.

In one terminal window, start the service:

After a few seconds, a Kafka cluster, MySQL instance, and the application are up and running.

Inspect the running containers to validate instances:

Kafka cluster runs at port 55002 and MySQL with id (4c220f7ee066) at port 60652.

NOTE: Ports and IDs might be different in each case.

In another terminal window, run the curl command to insert a new Movie.

Inspect the Quarkus terminal window and see the SQL statements run against the database:

To validate that Debezium detects the change and pushes it to the Movies Kafka topic, run the kcat tool to query a Kafka topic, setting the exposed port of the service.

We’ve implemented a solution that fixes the dual writes problem between a database and an external system by using Debezium to read transaction logs and trigger an event for every change.

In this example, we used Debezium Embedded, and we implemented the logic to execute when an event was fired. 

The embedded approach might work in some scenarios, but in others (especially in brownfield projects or projects where you require high scalability and fault-tolerance), Debezium Server might suit better. With Debezium Server (as a Kafka Connect process), no change in your code is required (no embed dependencies), as Debezium is a standalone process connecting to a database transaction log, detecting the changes, and sending them to a Kafka topic. Since events are ordered, any system can consume these changes from the topic.

Although an Outbox Pattern is not mandatory when using Debezium (at the very end, Debezium can listen for changes in any table), it’s a good practice to isolate your data, and an Outbox Pattern helps you with this.

Integrating (micro)service architectures might seem easy initially, but when you start integrating data, things become more complex, and the Debezium project is here to help you with this task.

Source code is available on GitHub.

Becoming an editor for InfoQ was one of the best decisions of my career. It has challenged me and helped me grow in so many ways. We'd love to have more people join our team.

Uncover emerging trends and practices from domain experts. Attend in-person at QCon San Francisco (October 24-28, 2022).

A round-up of last week’s content on InfoQ sent out every Tuesday. Join a community of over 250,000 senior developers. View an example

You need to Register an InfoQ account or Login or login to post comments. But there's so much more behind being registered.

Get the most out of the InfoQ experience.

Allowed html: a,b,br,blockquote,i,li,pre,u,ul,p

Your message is awaiting moderation. Thank you for participating in the discussion.

CDC is both a pattern and an implementation. CDC exists today in most databases. (Greenplum and HBase* do not support it, but my search was not extensive.) CDC is essentially a post transaction trigger that calls a stored procedure to update a backing table / log table to capture the changes made by the transaction. In your example, You could persist your information first to the database which has CDC enabled for that table. This just writes the data into an internal table/log. Then you would have an external listener capture the event, and then write it to your stream. You will always have two writes. The issue is if they are concurrent or cascading. Conversely you could have first written to the stream, and then made the database a consumer of the data on the stream. Again cascading writes. I'm not sure of the need for Debezium. So what am I missing? Thx *Hbase writes to the WAL, but this is bypassed in some operations. Its possible to write a Co-procesor to implement CDC, but why? HBase should just die a swift death... ;-)

Allowed html: a,b,br,blockquote,i,li,pre,u,ul,p

Allowed html: a,b,br,blockquote,i,li,pre,u,ul,p

A round-up of last week’s content on InfoQ sent out every Tuesday. Join a community of over 250,000 senior developers. View an example

Real-world technical talks. No product pitches. Practical ideas to inspire you and your team. QCon San Francisco - Oct 24-28, In-person. QCon San Francisco brings together the world's most innovative senior software engineers across multiple domains to share their real-world implementation of emerging trends and practices. Uncover emerging software trends and practices to solve your complex engineering challenges, without the product pitches.Save your spot now

InfoQ.com and all content copyright © 2006-2022 C4Media Inc. InfoQ.com hosted at Contegix, the best ISP we've ever worked with. Privacy Notice, Terms And Conditions, Cookie Policy