Flink CDC is an interesting part of Apache Flink that I’ve been meaning to take a proper look at for some time now. Originally created by Ververica in 2021 and called “CDC Connectors for Apache Flink”, it was donated to live under the Apache Flink project in April 2024.
In this post I’m going to look at what Flink CDC actually is (because it took me a while to properly grok it), and consider where it fits into the data engineers’ toolkit. I’m looking at this from a non-coding point of view—SQL and YAML is all we need, right? 😉 For Java and PyFlink your mileage may vary (YMMV) on this, but I still think Flink CDC has a strong home even in this part of the ecosystem for some use cases.
Let’s start with what Flink CDC does. It provides two distinct features:
- Pipeline Connectors to create CDC pipelines declaratively from YAML. This is really powerful and its value shouldn’t be underestimated.
- A set of CDC source connectors. These are also pretty cool, but in a sense “just” connectors.
The relationship between these two is somewhat lop-sided: whilst the pipelines rely on a CDC source connector, not all CDC source connectors have a corresponding pipeline connector. The upshot of this is that as of Flink CDC 3.2 (early December 2024) the only source pipeline connector is for MySQL, so if you are working with a different database, you’ll have to use the respective source connector directly. The source connectors support the Flink Table API and so can be used from Flink SQL, PyFlink, and Java. Source connectors include Postgres, Oracle, MySQL, and several more.
In terms of sink pipeline connectors there is a bit more variety, with support for Elasticsearch, Kafka, Paimon, and several other technologies.
But let’s not get bogged down in connectors. The declarative pipelines feature of Flink CDC is the bit that is really important to get your head around, as it’s pretty darn impressive. Many of the gnarly or tedious (or tediously gnarly) things that come about in building a data integration pipeline such as schema evolution, full or partial database sync, primary key handling, data type translation, and transformation are all handled for you.
Here’s an example of a YAML file for a continuous sync of tables in MySQL to Elasticsearch:
That’s it! And with that Flink CDC will sync all the tables under inventory schema over to corresponding indices in Elasticsearch. It’ll map primary keys to document IDs, update documents in-place if the source MySQL row changes, and so on.
Why are Flink CDC pipelines such a big deal?
Look at that YAML up there 👆
That’s all it takes to generate and run a Flink job:
A Flink CDC pipeline can do pretty much all the heavy lifting of data integration for you, including:
- Matching to one, some, or all of the tables in the source database
- Snapshotting the source tables, using CDC to capture and propagate all subsequent changes
- Providing exactly-once semantics
- Low latency streaming, not batch
- Support for schema evolution
- Data transformation
- Data type translation
We’re going to take a look at those later in this post, but first up, let’s explore more about what it is that makes Flink CDC pipelines such a big deal.
I still don’t get it. How’s this different from Flink SQL? Or, Can’t I just use PyFlink?
I’ve written before about using Flink SQL for ETL pipelines. At a very high level, the pattern looks like this:
- Get your head around Flink catalogs and then try them out until you’ve decided which you’ll use. That, or just use the in-memory Flink catalog and get used to having to re-run all your DDL each time you restart your Flink SQL session…
- Create a Flink SQL table using the source connector. If you want data from a database this might well be one of the Flink CDC source connectors, or any other that Flink offers.
- Create a Flink SQL table using a sink connector.
- Run <span class="inline-code">INSERT INTO my_sink SELECT * FROM my_source</span>.
- Profit.
But…the devil is most definitely in the detail. I’m feeling benevolent so we’ll skip over #1 and assume that you have Flink catalogs completely understood and in hand. Let’s consider #2. What does that statement look like? What’s the equivalent of our Flink CDC pipeline YAML?
First off we need to know what the tables are, so we need to query MySQL:
OK, so we’ve got six tables. When we create a Flink table we need to specify the schema. Therefore, we need the schema for each of the source tables:
Now we need to tidy the DDL up so that it’s usable in Flink SQL. We’ll ignore the <span class="inline-code">DROP TABLE</span> and the <span class="inline-code">ENGINE […]</span> bits, and add on the necessary <span class="inline-code">WITH</span> clause for the connector:
The trouble is, it needs more tidy-up than that:
OK, strip that out. Still not enough:
Let’s ditch the <span class="inline-code">KEY</span> stuff then, which seems to work:
…except that it doesn’t:
So with the primary key added back in (along with <span class="inline-code">NOT ENFORCED</span>), the successful statement looks like this:
Let’s verify that the data is being fetched from MySQL:
With the source table (one table of the six in MySQL, remember) let’s assume the others will be fine (narrator: they weren’t; turns out <span class="inline-code">ENUM</span> and <span class="inline-code">GEOMETRY</span> also need some manual intervention) and look at the sink end of the pipeline. This, in theory, should be simpler. Right?
Our equivalent of the Flink CDC pipeline YAML...
...is a series of Flink SQL tables using the Elasticsearch connector to stream data from the corresponding source Flink SQL table. Continuing with the <span class="inline-code">customers</span> example that we eventually successfully created above, the Elasticsearch sink looks like this:
Good ’ole <span class="inline-code">CREATE TABLE…AS SELECT</span>, or <span class="inline-code">CTAS</span> as it’s affectionately known. The problem is, it’s not this easy. To start with everything looks just great - here’s one of the MySQL rows as a document in Elasticsearch:
If you’re super eagle-eyed perhaps you can spot the problem. I wasn’t and didn’t, and as a result assumed that when I updated MySQL:
I would not only see it reflected in the Flink SQL table, which I did (as a pair of change records):
but also in Elasticsearch, with the document showing the update. But instead I got this:
Instead of updating the document in place, I got a new document appended to the index. The reason was that the Flink SQL table with the Elasticsearch sink didn’t have the primary key propagated to it as part of the <span class="inline-code">CREATE TABLE…AS SELECT</span>. You can verify this from Flink SQL—note the empty key column:
Since there’s no primary key, Elasticsearch is just creating its own key for each document created, and thus the <span class="inline-code">UPDATE</span> doesn’t propagate as we’d expect.
The fix is therefore to make sure there is a primary key declared. We can’t do this with CTAS itself:
So instead we have to split it into a <span class="inline-code">CREATE</span> and then an <span class="inline-code">INSERT</span>:
(Note that I’ve taken the lazy route and sent the data for this updated approach to a new Elasticsearch index, <span class="inline-code">customers01</span>, to avoid further complications.)
Now the <span class="inline-code">id</span> field is used as the <span class="inline-code">document _id</span>:
and when the record is updated in MySQL:
the document in Elasticsearch gets updated correctly too:
Phew, we got there in the end, right?! 😅 But that was one table. Of six. And all we’re doing is handling the relatively straightforward business of primary key propagation. This example in itself shows how Flink CDC pipelines are going to save orders of magnitude of engineering effort when it comes to building pipelines. Just to remind ourselves, this is how we accomplish all of what we did here (and more, since we only actually worked through one table!), in YAML:
Flink CDC is solving a common problem
Before the advent of Flink CDC 3.0 and its notion of end-to-end pipelines, maintaining this kind of data flow was much more cumbersome, requiring to define a potentially large number of bespoke source and sink tables by hand. In Decodable, we have simplified this set-up via things like multi-stream connectors and declarative resource management, and it’s great to see that similar concepts are available now in Flink CDC, substantially reducing the effort for creating and operating comprehensive data pipelines.
Digging into Flink CDC pipelines in more depth
Schema evolution
Just as data doesn’t stand still and that’s why we use streams, the shape of data is always changing too. If we’ve got a process replicating a set of tables from one place to another, we need to know that it’s going to handle changes happening to the schema.
Let’s imagine we add a field to the <span class="inline-code">customers</span> table:
How does the Flink CDC pipeline that we set running earlier handle it? Well, it doesn’t crash, which is always a nice start:
Not only that, but it seamlessly propagates the column change and update to Elasticsearch:
To do this with Flink SQL would be messy. In fact, I would need to go and learn how to do it (and first, determine if it’s even possible). You’d want to be able to preserve the state of the existing job so that you’re not unnecessarily reprocessing records. Then you’d need to drop the existing source table, create the new table with the updated schema, and then run the <span class="inline-code">INSERT INTO sink_table</span> again, hopefully restoring the state from the previous incarnation.
Perhaps you don’t want schema changes to persist downstream, particularly if they’re destructive (such as removing a column). Flink CDC pipelines can be customised to control how schema evolution is handled (including ignoring it entirely):
You can also customise at the sink level how individual types of schema change event are processed by the sink:
Routing and renaming tables
By default, Flink CDC pipelines will use the source table name as the target too. In our example <span class="inline-code">inventory.customers</span> in MySQL is written to Elasticsearch as exactly that—an index called <span class="inline-code">inventory.customers</span>.
Using the route YAML configuration you can customise the name of the target object, and through the same method, fan-in multiple source tables to a single target (by setting all the sources to a single target object name). You can also selectively rename certain tables matching a pattern. Here’s an example which will add a prefix to any table matching the <span class="inline-code">inventory.product*</span> pattern:
Which from a set of MySQL tables like this:
is streamed into these Elasticsearch indices—notice the <span class="inline-code">routing_example__</span> prefix on the two <span class="inline-code">product*</span> tables:
Unlike things like schema evolution, the routing and renaming of tables is something that’s handled in a fairly straightforward way with Flink SQL; you’d change the target object name as part of the DDL. For routing multiple tables into one you have multiple <span class="inline-code">INSERT INTO</span> statements, or a single <span class="inline-code">INSERT</span> made up of a series of <span class="inline-code">UNION</span>s. It’s not as elegant or self-explanatory as declarative YAML, but it’s not the eye-wateringly painful process of setting up multiple tables into the pipeline in the first place.
One more use of route to mention is if you’re iteratively developing a pipeline and don’t want to have to clean out the target system each time you restart it. This YAML will add a suffix to each table name from the inventory database when it’s written to the sink. Each time you have a new test version of the pipeline, bump the suffix and it’ll write to a new set of tables.
Transforms
As with routing, transforms kind of come as part and parcel of using Flink SQL to build a pipeline. If you’re already having to write a <span class="inline-code">CREATE TABLE</span> and <span class="inline-code">INSERT INTO</span> manually, then customising these to apply transformations is pretty standard. Say you want to create a new field that’s a composite of two others, in Flink SQL you’d do this:
An actual example of this in practice joining the <span class="inline-code">first_name</span> and <span class="inline-code">last_name</span> fields in the <span class="inline-code">customers</span> table in Flink CDC pipeline’s transform YAML is the somewhat less cumbersome and error-prone declarative instruction:
Note the <span class="inline-code">\*</span> — this is adding all the existing fields to the projection (it’s a <span class="inline-code">*</span> escaped with a <span class="inline-code">\</span>). Without it the only field that would be included from <span class="inline-code">customers</span> would be the new one. Put another way, projection is basically whatever you’d write in your <span class="inline-code">SELECT</span> (which is indeed what a <span class="inline-code">SELECT</span> is—the projection clause of a SQL statement).
If you go and look in the Flink Web UI you’ll see an additional Transform operator has been added:
As well as adding fields with a transformation, you can remove them too, by omitting them from the projection list. For example, if a <span class="inline-code">customer_details</span> source table has the fields<span class="inline-code">account_id, update_ts, social_security_num</span> and you want to omit the personally identifiable information (PII), you could do this in the pipeline with:
The similar principle would apply to doing this in Flink SQL.
Unfortunately Flink SQL nor CDC pipelines support an exclusion parameter for projections in the way that some engines including Snowflake and DuckDB do which means that you can’t do something like <span class="inline-code">SELECT * EXCLUDE social_security_num FROM customer_details</span>. This therefore makes the pipeline a bit more brittle than is ideal since you’d need to modify it every time a new non-PII column was added to the schema.
You can also use transformations to lift metadata from the pipeline rows into the target table:
Note that combining transforms can get a bit problematic if they’re both modifying the projection for the same table. Check the Flink logs if in doubt for errors.
Some notes about running Flink CDC Pipelines
As I’ll discuss later in this post, there are still some gaps in Flink CDC. One of those is the ops side of things. Running a pipeline is simple enough:
But then what? If the pipeline fails to compile, you’ll get an error back to the console. But assuming it compiles, it still might not work. From this point on, it’s simply a Flink job to inspect and manage as any other. You’ve got the built-in Flink Web UI which is also useful, as well as CLI and REST API options:
Using these you can cancel a running job by passing its id (which you’ll also get from Flink CDC when it launches the pipeline):
You can also cancel all running jobs on the cluster, which if you’re running on a sandbox instance and randomly jiggling things until they unbreak can be a bit of a timesaver:
If you look closely at the above logic, it’s not only <span class="inline-code">RUNNING</span> jobs that are matched, but also <span class="inline-code">RESTARTING</span>. One of the things that I noticed a lot was that jobs might be logically invalid (e.g. specifying a column that doesn’t exist) but don’t abort and instead simply go into a seemingly infinite <span class="inline-code">RESTARTING/RUNNING</span> loop. The problem with this is that it’s not always apparent that there’s even a problem, particularly if you happen to look at the job status when it’s <span class="inline-code">RUNNING< and before it’s failed. Add a route so that you don’t have to clean up the target each time when you’re iteratively testing.
This sounds great. What’s the catch?
It’s still pretty early days with Flink CDC, and 3.3 is being worked on as I type, so it’ll be good to see how it shapes up.
Put bluntly, Flink CDC pipelines show a ton of potential. Is it a cool idea? Yup. Does the world need more YAML? Of course! Does it open up stream processing and integration to non-Java coders more than Flink SQL does? Definitely. Is it ready for much more than just experimentation? Well…
If your source database is MySQL, and a pipeline connector exists for your target technology, then sure, it’s worth keeping an eye on. If open source development is your thing then jump right in and get involved, because that’s what it needs. A bit more polish, and a lot more pipeline connectors.
Again, I want to be clear—this is a relatively young project. Apache Flink itself has been around for TEN years, whilst Flink CDC much less than that. As a huge fan of the open source world, I will be completely cheering for Flink CDC to grow and succeed. As a data engineer looking around at tools to get my job done, I am still cautious until it’s proved itself further.
From that angle, of a data engineer evaluating a tool, one of the key areas that it really needs to develop in is connectors. It’s missing many of the common sinks such as Snowflake, Apache Iceberg, RDBMS, etc. On the source side I’d hope that the existing set of Flink CDC source connectors soon make it into the pipeline connector world too. Whilst you could work around a missing sink connector by using the Kafka sink pipeline connector and then taking the data from Kafka to the desired sink using Flink SQL (or Kafka Connect), this really defeats the point of a single tool to build a pipeline declaratively. It helps a little bit, but you’re still where you are today—piecing together bits of SQL and config manually into a bespoke pipeline.
Other things that will help it grow its adoption include:
- Better validation of input, and cleaner errors when things don’t go right
- Support for stateful pipelines—at the moment you can’t join or aggregate.
- Clearer and more accurate documentation, both for specifics and also the project overall. It took me writing this post to really get my head around the real power of Flink CDC pipelines and what they actually are (and aren’t).
- The ops side of things seems a bit rudimentary. After compilation it’s “fire & forget”, leaving you to poke around the standard Flink job management interfaces. This is perhaps “icing on the cake”, but if it’s simplifying building jobs, why not simplify running them too?
- Support for Debezium 3.0 and its richer functionality including better snapshotting, improved performance, and richer support for things like Postgres replication slots on read replicas.
Ultimately, Flink CDC brings a really rich set of functionality, so let’s hope it can really develop and thrive as part of the Apache Flink ecosystem.
If you’d like to try Flink CDC out for yourself, you can find a Docker Compose file and full details on GitHub.