Back
December 17, 2024
12
min read

Aggregating Change Data Capture Events based on Transactional Boundaries

Data pipelines built on top of change data capture (CDC) are gaining ever more traction and power many different real-time applications these days. The standard way CDC solutions operate is to propagate captured data changes as separate events, which are typically consumed one by one and as is by downstream systems. This article explores CDC pipelines in the context of transactional systems and discusses how the direct consumption of individually published CDC events impacts data consistency at the sink side. In particular, it highlights why the lack of transactional boundaries in change event streams may well lead to temporarily inconsistent state and shows how to tackle this often neglected problem by means of transaction-aware aggregation of CDC events.

CDC 101 with Debezium

To begin with, let’s quickly recap on a high-level, what the default behavior for change data capture (CDC) with Debezium looks like when no customized configuration is in place. After taking an optional initial snapshot of the existing data, Debezium will continuously capture any data modifications per database table as individual change events, one for each row change.  By default, Debezium changes events are propagated on a per database table basis. For instance, when working with Flink SQL and a Flink CDC connector based on the Debezium Engine, change events typically end up in separate Flink tables. Also, if Debezium is deployed in the context of Kafka and Kafka Connect, there is a 1:1 relationship between the captured database tables and the corresponding Kafka topics. The latter setup is depicted below and will be assumed for the running example throughout the rest of this article.

Understanding CDC Event Consistency

While this standard way of capturing and propagating CDC events serves many use cases perfectly fine, there is one important aspect to highlight which tends to get overlooked. The fact that all CDC events are published individually implies that there are no transactional boundaries which could be considered downstream. When building a streaming data pipeline and feeding these change events into a target data store, all that’s guaranteed is that the data will eventually become consistent.

In the light of transactional processing in the source database it may well happen to temporarily see intermediary results downstream. Chances are that only a subset of several changes—which originally happened within a single transaction—are reflected in the sink system at the time of querying it. The consequences of this depend on the use case and vary from basically negligible to completely unacceptable. In other words, it boils down to the question of whether or not you are willing to tolerate temporarily inconsistent state for downstream consumers of CDC events.

Source Database Transactions in Action

Let’s look at a concrete example to make this problem more tangible. The CDC source is a MySQL database where the following transaction is executed to insert a new customer record together with a corresponding address record:

START TRANSACTION;
INSERT INTO inventory.customers VALUES
   (default, 'Issac', 'Fletcher', 'ifletcher@example.com');
SET @customer_id = LAST_INSERT_ID();
INSERT INTO inventory.addresses VALUES
   (default, @customer_id, '1234 Nowhere Street', 'Great City', 'SomeState', '12345', 'LIVING');
COMMIT;

With the typical CDC setup for Debezium, both INSERT operations would be captured individually and published separately, for instance, into two different Kafka topics.

‍

Given that these two CDC events are then written into a sink system we have by default no way to make sure that both of them get applied atomically. The bottom line is that we might see one of the following four outcomes when querying the sink system:

  • âś… Both records are missing: This is valid state and fine, telling us that none of the two CDC events made it into the sink yet
  • ❌ Customer record is available but address record still missing: This represents intermediary state which can be considered invalid since this state never existed in the source database
  • ❌ Address record is available but customer record still missing: This also represents intermediary state which is inconsistent and even more questionable as the address points to a customer that doesn’t exist yet. As a result, enforcing foreign keys in the sink data store would not be possible either.
  • âś… Both records are available: this is strictly speaking the only valid state for propagated CDC events referring to this specific transaction

Transactional CDC Event Aggregation

Let's tackle the problem explained above and figure out how to work with CDC event streams in the context of transactional systems without suffering from a temporarily invalid state at the sink side.

What do we want to achieve?

Our goal is to find a way for the individual change events which originally belonged to the same database transaction to get aggregated accordingly. In the context of this article the desired outcome can be referred to as transactional buffering of CDC events and it conceptually looks as follows:

Once any such transactional (TX) buffer is complete, it can be propagated as a whole for downstream consumption of bespoke consumers, thereby avoiding the problem of temporarily invalid state. The rationale behind this approach is to support use cases which require a stricter level of consistency at the consumption side of CDC event streams. Provided that there is a sink system in place which itself supports the notion of transactions, a TX buffer can be atomically applied to mimic the transactional semantics at the source side of the CDC pipeline.

What’s needed for this to work?

In order to successfully aggregate data change events in alignment with transactional boundaries at the data source, the CDC tool in use must provide additional pieces of information reflecting the metadata about transactions. Generally speaking, it’s required to know the following:

  • when a transaction starts and ends respectively
  • how many change events per different table belong to a specific transaction
  • a transaction identifier as part of each change event to assign it to the proper transaction

All of the necessary metadata bits can be provided by Debezium if the connector in question actually supports it. Using the default configuration though, Debezium connectors do not expose transaction metadata, so the first step is to set the corresponding property <span class="inline-code">provide.transaction.metadata=true</span> as is documented e.g. for the MySQL connector here. Setting this configuration results in:

  1. An additional, dedicated Kafka topic gets created which stores all transaction-related BEGIN and END markers. Besides some common data such as the transaction id, a timestamp, and the marker type, END markers also store which tables were affected and how many rows per table were modified as part of the transaction in question. This is vital to be able to know when all CDC events for a single transaction have been seen and successfully buffered.
  1. Each payload of a change event will be enriched with a transaction field which itself contains three pieces of information: the transaction id, the order of the change event among all events emitted for the specific table, the total order i.e. the absolute position of this change event among all events originating from the transaction.

Let’s refer to the example from above and the one transaction which inserts a new customer record together with a corresponding address record.

Below are JSON snippets showing actual transaction marker events plus the CDC events that represent the two insert operations into different tables as part of the same transaction.

Actual Transaction Markers in Detail

  • BEGIN event: holds common transaction information such as its ID (<span class="inline-code">id</span> field), the <span class="inline-code">status</span>, and a timestamp, <span class="inline-code">ts_ms</span>. The two null fields (<span class="inline-code">event_count</span> and <span class="inline-code">data_collections</span>) are set for END markers only. Note that the value used for a transaction’s ID can be any uniquely addressable value. The actual value Debezium uses depends on the connector plugin. For MySQL it’s composed of the binary log’s file name and the log position in the file.
{
       "status": "BEGIN",
       "id": "file=binlog.000003,pos=236",
       "event_count": null,
       "data_collections": null,
       "ts_ms": 1731660981000
}
  • END event: contains the metadata signaling that this transaction in total created two change events (<span class="inline-code">"event_count": 2</span>). One change event for the <span class="inline-code">customers</span> table and one for the <span class="inline-code">addresses</span> table (<span class="inline-code">data_collections</span> field). Debezium uses the more generic term <span class="inline-code">data_collection</span> so as to also make it applicable to non-relational data stores which might not use the term <span class="inline-code">table</span>.
{
       "status": "END",
       "id": "file=binlog.000003,pos=236",
       "event_count": 2,
       "data_collections": [
           {
               "data_collection": "inventory.customers",
               "event_count": 1
           },
           {
               "data_collection": "inventory.addresses",
               "event_count": 1
           }
       ],
       "ts_ms": 1731660981000
}

Actual Change Events in Detail

  • INSERT event for customers table: typical Debezium create change event (<span class="inline-code">"op": "c"</span>) but enriched with the information about the transaction id (<span class="inline-code">"id": "file=binlog.000003,pos=236"</span>), the order of this event among others within the customers table (<span class="inline-code">"data_collection_order": 1</span>), and the total order of this event as part of the whole transaction (<span class="inline-code">"total_order": 1</span>).
{
       "before": null,
       "after": {
           "id": 1005,
           "first_name": "Issac",
           "last_name": "Fletcher",
           "email": "ifletcher@example.com"
       },
       "source": {
           ...
       },
       "transaction": {
           "id": "file=binlog.000003,pos=236",
           "total_order": 1,
           "data_collection_order": 1
       },
       "op": "c",
       "ts_ms": 1731660982008,
       "ts_us": 1731660982008282,
       "ts_ns": 1731660982008282755
}
  • INSERT event for addresses table: typical Debezium create change event (<span class="inline-code">"op": "c"</span>) but enriched with the information about the transaction id (<span class="inline-code">"id": "file=binlog.000003,pos=236"</span>), the order of this event among others within the addresses table (<span class="inline-code">"data_collection_order": 1</span>), and the total order of this event as part of the whole transaction (<span class="inline-code">"total_order": 2</span>).
{
       "before": null,
       "after": {
           "id": 17,
           "customer_id": 1005,
           "street": "1234 Nowhere Street",
           "city": "Great City",
           "state": "SomeState",
           "zip": "12345",
           "type": "LIVING"
       },
       "source": {
           ...
       },
       "transaction": {
           "id": "file=binlog.000003,pos=236",
           "total_order": 2,
           "data_collection_order": 1
       },
       "op": "c",
       "ts_ms": 1731660982009,
       "ts_us": 1731660982009759,
       "ts_ns": 1731660982009759380
}

Experimental Implementation

Let’s think about some of the challenges we would need to address if we wanted to build a stream processing job to aggregate individual CDC events belonging to the same database transaction into one coherent buffer. To reiterate that, the goal is to enable downstream systems to then atomically apply these CDC event buffers and thereby mirror the source database's transactional integrity.

Challenging Aspects for TX Buffering

  • Transactions can affect an arbitrary number of tables and rows respectively.
  • Metadata and CDC events for a transaction are published as separate streams. At least two streams—in general N+1 streams where N is the number of different tables involved—need to be correlated.
  • Buffering CDC events with a stream processor across multiple tasks / machines requires proper data (re)partitioning such that all events belonging to the same transaction can be processed on one specific task / machine.
  • Since CDC events are scattered across several topics and partitions, consuming them in general happens in a non-deterministic sequence as events are read from different streams in parallel.
  • The original transaction order must be restored, since the completion of buffers for the respective transactions is not guaranteed to happen in the same sequence. A large transaction (TX 1) which finished first in the database can take longer to buffer than a small transaction (TX 2) which finished second in the database. In this case, the completed buffer for TX 2 must be retained and cannot be emitted until after TX 1—which came first in the database—was fully processed.

Proof of Concept Flink Job

Let’s do a quick thought experiment to understand how a Flink job for transactional buffering of CDC events could look like on a high-level.

Job Input

It all starts with defining the job’s input data which is read using the KafkaSource connector for Flink’s DataStream API. The first input (Kafka Source 1) reads from all relevant CDC topics plus the transaction metadata topic into a DataStream. Then there is a second input (Kafka Source 2) which only reads from the transaction metadata topic and is used to create a BroadcastStream.

Job Processing

All events coming in via the Kafka Source 1 get keyed by the transaction identifier which results in a KeyedStream. This is done to ensure that all events get properly partitioned such that everything belonging to the same transaction ends up on the same Flink task manager instance when doing parallel processing. Kafka Source 2 backs a BroadcastStream and is used to maintain broadcast state that is accessible on every parallel processing instance.

With both input streams defined, the keyed stream and broadcast stream are then connected to be further processed with the custom transaction buffering logic that is expressed by means of Flink’s KeyedBroadcastProcessFunction for which there are three essential methods to be implemented:

processElement(...)

This method is called for each event in the keyed stream which includes both standard CDC events as well as transaction metadata events. Hence, the event type decides how to process the respective payload. The key representing the transaction identifier is used to check if a new TX buffer needs to be created or the current event can be added to the existing one. The TX buffer is maintained as keyed state. Whenever a completed TX buffer is detected, a timer is registered to perform a chronology check and decide if the completed TX buffer can be emitted, which is done based on the broadcast state.

processBroadcastElement(...)

Is called for each transaction metadata in the broadcast stream. The primary responsibility of this method is to perform all the bookkeeping with regard to the chronology of incoming transactions by using broadcast state. Additionally, this method reacts to custom metadata events which are sent to signal successfully emitted TX buffers. The fact that transaction chronology is maintained as broadcast state allows all task manager instances to locally interact with their respective copy of said state.

onTimer(...)

When a previously registered timer for a completed TX buffer fires, its main purpose is to perform a chronology check for a given transaction identifier by consulting the broadcast state. Any specific TX buffer is safe to emit whenever there is no pending i.e. completed but not yet emitted TX buffer for a transaction identifier which came before the one in question. Depending on the outcome of this check the TX buffer for a given transaction identifier gets either emitted and cleared from the keyed state, or another timer is registered to revisit the chronology check for the same transaction identifier.

Job Output

The job’s main output are the TX buffer events which contain all individual CDC events belonging to the corresponding database transaction. An instance of the KafkaSink connector (Kafka Sink 1) is defined to write these TX buffers into a dedicated output topic. This is what bespoke downstream systems can consume from to process the buffered CDC events accordingly. In addition to emitting the TX buffer itself, a custom metadata event is written to a side output. This custom event is injected back into the transaction metadata topic via another Kafka sink. Doing so is necessary to be able to react to the fact that a TX buffer was successfully emitted by updating the transaction-related bookkeeping (see processBroadcastElement(...) of the KeyedBroadcastProcessFunction) that uses broadcast state under the covers.

Potential Payload Structure for TX Buffers

While CDC events and TX metadata events are well-defined and standardized—in this case by the Debezium project—the exact payload structure of a TX buffer can essentially be designed at will and depends on the respective implementation. By combining the information contained in the transaction metadata and the individual change events, the basic structure of such a TX buffer—referring to the example used throughout this article—could potentially look like this:

{
   "beginMarker": {
"id": "file=binlog.000003,pos=236",
"status": "BEGIN",
"event_count": 0,
"data_collections": null,
"ts_ms": 1732178892000
   },
   "endMarker": {
       "id": "file=binlog.000003,pos=236",
       "status": "END",
       "event_count": 2,
       "data_collections": [
           {
               "data_collection": "inventory.customers",
               "event_count": 1
           },
           {
               "data_collection": "inventory.addresses",
               "event_count": 1
           }
       ],
       "ts_ms": 1732178892000
   },
   "buffer": {
       "inventory.customers": [
           {
               "key": "FULL_CDC_EVENT_KEY_PAYLOAD_HERE",
               "value": "FULL_CDC_EVENT_VALUE_PAYLOAD_HERE"
           }
       ],
       "inventory.addresses": [
           {
              "key": "FULL_CDC_EVENT_KEY_PAYLOAD_HERE",
              "value": "FULL_CDC_EVENT_VALUE_PAYLOAD_HERE"
           }
       ]
   }
}

With TX buffers like this written into a dedicated Kafka output topic, bespoke downstream consumers which retrieve one such buffer full of CDC events for a specific transaction and want to apply its contents atomically then need to:

  1. Disassemble the TX buffer by creating respective SQL statements according to the change event type
  2. Start a transaction
  3. Apply all INSERT / UPDATE / DELETE statements derived from the TX buffer
  4. Commit the transaction

The experimental code base for the Flink job is available in our GitHub examples repository.

Limitations and Alternatives

While the experimental implementation to aggregate CDC events is rather flexible and should be able to cope with basic CRUD workloads, it's generally unsuited to support arbitrarily large database transactions. If, for instance, several hundred thousand table rows get changed within a single database transaction, the corresponding CDC event buffer would end up being rather large. It might easily exceed certain limits such as the maximum message size for records written into the Kafka output topic. One way to work around this problem would be to directly feed TX buffers into a target system rather than producing them into a Kafka output topic. This could be achieved using a bespoke Flink DataSink implementation that understands the different semantics for a TX buffer’s contents and can atomically process all contained events.

Besides a custom stream processing job, what about other implementation strategies to achieve the same thing?

First, there is the outbox pattern which might allow us to circumvent the problem of dealing with individual change events altogether. If transactional scope only matters for pre-defined aggregate structures—customers with their addresses or purchase orders with their order lines—there would be no need to communicate individual change events and reassemble them according to transactional boundaries. Instead, any changes for a single aggregate structure would happen transactionally at the source side and the outbox table could be used to communicate complete aggregates to the outside world. Then, any client would only ever get to see valid states for a single aggregate in question. Of course, the scope of such an implementation is considerably narrower and lacks the flexibility to support transactional semantics for any changes beyond pre-defined aggregate structures.

A second alternative for buffering multiple individual change events in the context of Kafka Connect could be to write a custom single message transformation (SMT). If the SMT gets to see transaction markers and every individual change event in order, it can buffer all CDC events falling between a begin and end marker and emit completed buffers on the fly. It is worth noting that a naive approach like this would effectively need unbounded memory and cannot provide any reasonable fault tolerance concerning the event buffer itself, as all operations happen in memory.

Summary

This article discussed the challenge of maintaining data consistency when processing CDC events from transactional systems such as relational databases. It highlighted how the lack of transactional boundaries in CDC event streams can temporarily result in invalid state—such as partial updates from multi-table transactions—that never existed in the source database. To mitigate this issue, the aggregating of CDC events based on their original transactional context was proposed as one solution. In order for this to work, Debezium can be configured to enrich data change events with all the required transaction-related metadata.

Finally, an experimental stream processing job using Apache Flink was briefly explored to demonstrate the practical aspects of this approach. The job's main output is a stream of transaction-aware CDC event buffers that downstream systems can consume and process, thereby effectively mimicking the source database's transactional integrity at the sink side.

📫 Email signup 👇

Did you enjoy this issue of Checkpoint Chronicle? Would you like the next edition delivered directly to your email to read from the comfort of your own home?

Simply enter your email address here and we'll send you the next issue as soon as it's published—and nothing else, we promise!

đź‘Ť Got it!
Oops! Something went wrong while submitting the form.
Hans-Peter Grahsl

Hans-Peter Grahsl is a Staff Developer Advocate at Decodable. He is an open-source community enthusiast and in particular passionate about event-driven architectures, distributed stream processing systems and data engineering. For his code contributions, conference talks and blog post writing at the intersection of the Apache Kafka and MongoDB communities, Hans-Peter received multiple community awards. He likes to code and is a regular speaker at developer conferences around the world.

Data pipelines built on top of change data capture (CDC) are gaining ever more traction and power many different real-time applications these days. The standard way CDC solutions operate is to propagate captured data changes as separate events, which are typically consumed one by one and as is by downstream systems. This article explores CDC pipelines in the context of transactional systems and discusses how the direct consumption of individually published CDC events impacts data consistency at the sink side. In particular, it highlights why the lack of transactional boundaries in change event streams may well lead to temporarily inconsistent state and shows how to tackle this often neglected problem by means of transaction-aware aggregation of CDC events.

CDC 101 with Debezium

To begin with, let’s quickly recap on a high-level, what the default behavior for change data capture (CDC) with Debezium looks like when no customized configuration is in place. After taking an optional initial snapshot of the existing data, Debezium will continuously capture any data modifications per database table as individual change events, one for each row change.  By default, Debezium changes events are propagated on a per database table basis. For instance, when working with Flink SQL and a Flink CDC connector based on the Debezium Engine, change events typically end up in separate Flink tables. Also, if Debezium is deployed in the context of Kafka and Kafka Connect, there is a 1:1 relationship between the captured database tables and the corresponding Kafka topics. The latter setup is depicted below and will be assumed for the running example throughout the rest of this article.

Understanding CDC Event Consistency

While this standard way of capturing and propagating CDC events serves many use cases perfectly fine, there is one important aspect to highlight which tends to get overlooked. The fact that all CDC events are published individually implies that there are no transactional boundaries which could be considered downstream. When building a streaming data pipeline and feeding these change events into a target data store, all that’s guaranteed is that the data will eventually become consistent.

In the light of transactional processing in the source database it may well happen to temporarily see intermediary results downstream. Chances are that only a subset of several changes—which originally happened within a single transaction—are reflected in the sink system at the time of querying it. The consequences of this depend on the use case and vary from basically negligible to completely unacceptable. In other words, it boils down to the question of whether or not you are willing to tolerate temporarily inconsistent state for downstream consumers of CDC events.

Source Database Transactions in Action

Let’s look at a concrete example to make this problem more tangible. The CDC source is a MySQL database where the following transaction is executed to insert a new customer record together with a corresponding address record:

START TRANSACTION;
INSERT INTO inventory.customers VALUES
   (default, 'Issac', 'Fletcher', 'ifletcher@example.com');
SET @customer_id = LAST_INSERT_ID();
INSERT INTO inventory.addresses VALUES
   (default, @customer_id, '1234 Nowhere Street', 'Great City', 'SomeState', '12345', 'LIVING');
COMMIT;

With the typical CDC setup for Debezium, both INSERT operations would be captured individually and published separately, for instance, into two different Kafka topics.

‍

Given that these two CDC events are then written into a sink system we have by default no way to make sure that both of them get applied atomically. The bottom line is that we might see one of the following four outcomes when querying the sink system:

  • âś… Both records are missing: This is valid state and fine, telling us that none of the two CDC events made it into the sink yet
  • ❌ Customer record is available but address record still missing: This represents intermediary state which can be considered invalid since this state never existed in the source database
  • ❌ Address record is available but customer record still missing: This also represents intermediary state which is inconsistent and even more questionable as the address points to a customer that doesn’t exist yet. As a result, enforcing foreign keys in the sink data store would not be possible either.
  • âś… Both records are available: this is strictly speaking the only valid state for propagated CDC events referring to this specific transaction

Transactional CDC Event Aggregation

Let's tackle the problem explained above and figure out how to work with CDC event streams in the context of transactional systems without suffering from a temporarily invalid state at the sink side.

What do we want to achieve?

Our goal is to find a way for the individual change events which originally belonged to the same database transaction to get aggregated accordingly. In the context of this article the desired outcome can be referred to as transactional buffering of CDC events and it conceptually looks as follows:

Once any such transactional (TX) buffer is complete, it can be propagated as a whole for downstream consumption of bespoke consumers, thereby avoiding the problem of temporarily invalid state. The rationale behind this approach is to support use cases which require a stricter level of consistency at the consumption side of CDC event streams. Provided that there is a sink system in place which itself supports the notion of transactions, a TX buffer can be atomically applied to mimic the transactional semantics at the source side of the CDC pipeline.

What’s needed for this to work?

In order to successfully aggregate data change events in alignment with transactional boundaries at the data source, the CDC tool in use must provide additional pieces of information reflecting the metadata about transactions. Generally speaking, it’s required to know the following:

  • when a transaction starts and ends respectively
  • how many change events per different table belong to a specific transaction
  • a transaction identifier as part of each change event to assign it to the proper transaction

All of the necessary metadata bits can be provided by Debezium if the connector in question actually supports it. Using the default configuration though, Debezium connectors do not expose transaction metadata, so the first step is to set the corresponding property <span class="inline-code">provide.transaction.metadata=true</span> as is documented e.g. for the MySQL connector here. Setting this configuration results in:

  1. An additional, dedicated Kafka topic gets created which stores all transaction-related BEGIN and END markers. Besides some common data such as the transaction id, a timestamp, and the marker type, END markers also store which tables were affected and how many rows per table were modified as part of the transaction in question. This is vital to be able to know when all CDC events for a single transaction have been seen and successfully buffered.
  1. Each payload of a change event will be enriched with a transaction field which itself contains three pieces of information: the transaction id, the order of the change event among all events emitted for the specific table, the total order i.e. the absolute position of this change event among all events originating from the transaction.

Let’s refer to the example from above and the one transaction which inserts a new customer record together with a corresponding address record.

Below are JSON snippets showing actual transaction marker events plus the CDC events that represent the two insert operations into different tables as part of the same transaction.

Actual Transaction Markers in Detail

  • BEGIN event: holds common transaction information such as its ID (<span class="inline-code">id</span> field), the <span class="inline-code">status</span>, and a timestamp, <span class="inline-code">ts_ms</span>. The two null fields (<span class="inline-code">event_count</span> and <span class="inline-code">data_collections</span>) are set for END markers only. Note that the value used for a transaction’s ID can be any uniquely addressable value. The actual value Debezium uses depends on the connector plugin. For MySQL it’s composed of the binary log’s file name and the log position in the file.
{
       "status": "BEGIN",
       "id": "file=binlog.000003,pos=236",
       "event_count": null,
       "data_collections": null,
       "ts_ms": 1731660981000
}
  • END event: contains the metadata signaling that this transaction in total created two change events (<span class="inline-code">"event_count": 2</span>). One change event for the <span class="inline-code">customers</span> table and one for the <span class="inline-code">addresses</span> table (<span class="inline-code">data_collections</span> field). Debezium uses the more generic term <span class="inline-code">data_collection</span> so as to also make it applicable to non-relational data stores which might not use the term <span class="inline-code">table</span>.
{
       "status": "END",
       "id": "file=binlog.000003,pos=236",
       "event_count": 2,
       "data_collections": [
           {
               "data_collection": "inventory.customers",
               "event_count": 1
           },
           {
               "data_collection": "inventory.addresses",
               "event_count": 1
           }
       ],
       "ts_ms": 1731660981000
}

Actual Change Events in Detail

  • INSERT event for customers table: typical Debezium create change event (<span class="inline-code">"op": "c"</span>) but enriched with the information about the transaction id (<span class="inline-code">"id": "file=binlog.000003,pos=236"</span>), the order of this event among others within the customers table (<span class="inline-code">"data_collection_order": 1</span>), and the total order of this event as part of the whole transaction (<span class="inline-code">"total_order": 1</span>).
{
       "before": null,
       "after": {
           "id": 1005,
           "first_name": "Issac",
           "last_name": "Fletcher",
           "email": "ifletcher@example.com"
       },
       "source": {
           ...
       },
       "transaction": {
           "id": "file=binlog.000003,pos=236",
           "total_order": 1,
           "data_collection_order": 1
       },
       "op": "c",
       "ts_ms": 1731660982008,
       "ts_us": 1731660982008282,
       "ts_ns": 1731660982008282755
}
  • INSERT event for addresses table: typical Debezium create change event (<span class="inline-code">"op": "c"</span>) but enriched with the information about the transaction id (<span class="inline-code">"id": "file=binlog.000003,pos=236"</span>), the order of this event among others within the addresses table (<span class="inline-code">"data_collection_order": 1</span>), and the total order of this event as part of the whole transaction (<span class="inline-code">"total_order": 2</span>).
{
       "before": null,
       "after": {
           "id": 17,
           "customer_id": 1005,
           "street": "1234 Nowhere Street",
           "city": "Great City",
           "state": "SomeState",
           "zip": "12345",
           "type": "LIVING"
       },
       "source": {
           ...
       },
       "transaction": {
           "id": "file=binlog.000003,pos=236",
           "total_order": 2,
           "data_collection_order": 1
       },
       "op": "c",
       "ts_ms": 1731660982009,
       "ts_us": 1731660982009759,
       "ts_ns": 1731660982009759380
}

Experimental Implementation

Let’s think about some of the challenges we would need to address if we wanted to build a stream processing job to aggregate individual CDC events belonging to the same database transaction into one coherent buffer. To reiterate that, the goal is to enable downstream systems to then atomically apply these CDC event buffers and thereby mirror the source database's transactional integrity.

Challenging Aspects for TX Buffering

  • Transactions can affect an arbitrary number of tables and rows respectively.
  • Metadata and CDC events for a transaction are published as separate streams. At least two streams—in general N+1 streams where N is the number of different tables involved—need to be correlated.
  • Buffering CDC events with a stream processor across multiple tasks / machines requires proper data (re)partitioning such that all events belonging to the same transaction can be processed on one specific task / machine.
  • Since CDC events are scattered across several topics and partitions, consuming them in general happens in a non-deterministic sequence as events are read from different streams in parallel.
  • The original transaction order must be restored, since the completion of buffers for the respective transactions is not guaranteed to happen in the same sequence. A large transaction (TX 1) which finished first in the database can take longer to buffer than a small transaction (TX 2) which finished second in the database. In this case, the completed buffer for TX 2 must be retained and cannot be emitted until after TX 1—which came first in the database—was fully processed.

Proof of Concept Flink Job

Let’s do a quick thought experiment to understand how a Flink job for transactional buffering of CDC events could look like on a high-level.

Job Input

It all starts with defining the job’s input data which is read using the KafkaSource connector for Flink’s DataStream API. The first input (Kafka Source 1) reads from all relevant CDC topics plus the transaction metadata topic into a DataStream. Then there is a second input (Kafka Source 2) which only reads from the transaction metadata topic and is used to create a BroadcastStream.

Job Processing

All events coming in via the Kafka Source 1 get keyed by the transaction identifier which results in a KeyedStream. This is done to ensure that all events get properly partitioned such that everything belonging to the same transaction ends up on the same Flink task manager instance when doing parallel processing. Kafka Source 2 backs a BroadcastStream and is used to maintain broadcast state that is accessible on every parallel processing instance.

With both input streams defined, the keyed stream and broadcast stream are then connected to be further processed with the custom transaction buffering logic that is expressed by means of Flink’s KeyedBroadcastProcessFunction for which there are three essential methods to be implemented:

processElement(...)

This method is called for each event in the keyed stream which includes both standard CDC events as well as transaction metadata events. Hence, the event type decides how to process the respective payload. The key representing the transaction identifier is used to check if a new TX buffer needs to be created or the current event can be added to the existing one. The TX buffer is maintained as keyed state. Whenever a completed TX buffer is detected, a timer is registered to perform a chronology check and decide if the completed TX buffer can be emitted, which is done based on the broadcast state.

processBroadcastElement(...)

Is called for each transaction metadata in the broadcast stream. The primary responsibility of this method is to perform all the bookkeeping with regard to the chronology of incoming transactions by using broadcast state. Additionally, this method reacts to custom metadata events which are sent to signal successfully emitted TX buffers. The fact that transaction chronology is maintained as broadcast state allows all task manager instances to locally interact with their respective copy of said state.

onTimer(...)

When a previously registered timer for a completed TX buffer fires, its main purpose is to perform a chronology check for a given transaction identifier by consulting the broadcast state. Any specific TX buffer is safe to emit whenever there is no pending i.e. completed but not yet emitted TX buffer for a transaction identifier which came before the one in question. Depending on the outcome of this check the TX buffer for a given transaction identifier gets either emitted and cleared from the keyed state, or another timer is registered to revisit the chronology check for the same transaction identifier.

Job Output

The job’s main output are the TX buffer events which contain all individual CDC events belonging to the corresponding database transaction. An instance of the KafkaSink connector (Kafka Sink 1) is defined to write these TX buffers into a dedicated output topic. This is what bespoke downstream systems can consume from to process the buffered CDC events accordingly. In addition to emitting the TX buffer itself, a custom metadata event is written to a side output. This custom event is injected back into the transaction metadata topic via another Kafka sink. Doing so is necessary to be able to react to the fact that a TX buffer was successfully emitted by updating the transaction-related bookkeeping (see processBroadcastElement(...) of the KeyedBroadcastProcessFunction) that uses broadcast state under the covers.

Potential Payload Structure for TX Buffers

While CDC events and TX metadata events are well-defined and standardized—in this case by the Debezium project—the exact payload structure of a TX buffer can essentially be designed at will and depends on the respective implementation. By combining the information contained in the transaction metadata and the individual change events, the basic structure of such a TX buffer—referring to the example used throughout this article—could potentially look like this:

{
   "beginMarker": {
"id": "file=binlog.000003,pos=236",
"status": "BEGIN",
"event_count": 0,
"data_collections": null,
"ts_ms": 1732178892000
   },
   "endMarker": {
       "id": "file=binlog.000003,pos=236",
       "status": "END",
       "event_count": 2,
       "data_collections": [
           {
               "data_collection": "inventory.customers",
               "event_count": 1
           },
           {
               "data_collection": "inventory.addresses",
               "event_count": 1
           }
       ],
       "ts_ms": 1732178892000
   },
   "buffer": {
       "inventory.customers": [
           {
               "key": "FULL_CDC_EVENT_KEY_PAYLOAD_HERE",
               "value": "FULL_CDC_EVENT_VALUE_PAYLOAD_HERE"
           }
       ],
       "inventory.addresses": [
           {
              "key": "FULL_CDC_EVENT_KEY_PAYLOAD_HERE",
              "value": "FULL_CDC_EVENT_VALUE_PAYLOAD_HERE"
           }
       ]
   }
}

With TX buffers like this written into a dedicated Kafka output topic, bespoke downstream consumers which retrieve one such buffer full of CDC events for a specific transaction and want to apply its contents atomically then need to:

  1. Disassemble the TX buffer by creating respective SQL statements according to the change event type
  2. Start a transaction
  3. Apply all INSERT / UPDATE / DELETE statements derived from the TX buffer
  4. Commit the transaction

The experimental code base for the Flink job is available in our GitHub examples repository.

Limitations and Alternatives

While the experimental implementation to aggregate CDC events is rather flexible and should be able to cope with basic CRUD workloads, it's generally unsuited to support arbitrarily large database transactions. If, for instance, several hundred thousand table rows get changed within a single database transaction, the corresponding CDC event buffer would end up being rather large. It might easily exceed certain limits such as the maximum message size for records written into the Kafka output topic. One way to work around this problem would be to directly feed TX buffers into a target system rather than producing them into a Kafka output topic. This could be achieved using a bespoke Flink DataSink implementation that understands the different semantics for a TX buffer’s contents and can atomically process all contained events.

Besides a custom stream processing job, what about other implementation strategies to achieve the same thing?

First, there is the outbox pattern which might allow us to circumvent the problem of dealing with individual change events altogether. If transactional scope only matters for pre-defined aggregate structures—customers with their addresses or purchase orders with their order lines—there would be no need to communicate individual change events and reassemble them according to transactional boundaries. Instead, any changes for a single aggregate structure would happen transactionally at the source side and the outbox table could be used to communicate complete aggregates to the outside world. Then, any client would only ever get to see valid states for a single aggregate in question. Of course, the scope of such an implementation is considerably narrower and lacks the flexibility to support transactional semantics for any changes beyond pre-defined aggregate structures.

A second alternative for buffering multiple individual change events in the context of Kafka Connect could be to write a custom single message transformation (SMT). If the SMT gets to see transaction markers and every individual change event in order, it can buffer all CDC events falling between a begin and end marker and emit completed buffers on the fly. It is worth noting that a naive approach like this would effectively need unbounded memory and cannot provide any reasonable fault tolerance concerning the event buffer itself, as all operations happen in memory.

Summary

This article discussed the challenge of maintaining data consistency when processing CDC events from transactional systems such as relational databases. It highlighted how the lack of transactional boundaries in CDC event streams can temporarily result in invalid state—such as partial updates from multi-table transactions—that never existed in the source database. To mitigate this issue, the aggregating of CDC events based on their original transactional context was proposed as one solution. In order for this to work, Debezium can be configured to enrich data change events with all the required transaction-related metadata.

Finally, an experimental stream processing job using Apache Flink was briefly explored to demonstrate the practical aspects of this approach. The job's main output is a stream of transaction-aware CDC event buffers that downstream systems can consume and process, thereby effectively mimicking the source database's transactional integrity at the sink side.

📫 Email signup 👇

Did you enjoy this issue of Checkpoint Chronicle? Would you like the next edition delivered directly to your email to read from the comfort of your own home?

Simply enter your email address here and we'll send you the next issue as soon as it's published—and nothing else, we promise!

Hans-Peter Grahsl

Hans-Peter Grahsl is a Staff Developer Advocate at Decodable. He is an open-source community enthusiast and in particular passionate about event-driven architectures, distributed stream processing systems and data engineering. For his code contributions, conference talks and blog post writing at the intersection of the Apache Kafka and MongoDB communities, Hans-Peter received multiple community awards. He likes to code and is a regular speaker at developer conferences around the world.