Back
December 11, 2024
9
min read

Exploring Flink CDC

By
Robin Moffatt
Share this post

Flink CDC is an interesting part of Apache Flink that I’ve been meaning to take a proper look at for some time now. Originally created by Ververica in 2021 and called “CDC Connectors for Apache Flink”, it was donated to live under the Apache Flink project in April 2024.

In this post I’m going to look at what Flink CDC actually is (because it took me a while to properly grok it), and consider where it fits into the data engineers’ toolkit. I’m looking at this from a non-coding point of view—SQL and YAML is all we need, right? 😉 For Java and PyFlink your mileage may vary (YMMV) on this, but I still think Flink CDC has a strong home even in this part of the ecosystem for some use cases.

Let’s start with what Flink CDC does. It provides two distinct features:

  • Pipeline Connectors to create CDC pipelines declaratively from YAML. This is really powerful and its value shouldn’t be underestimated.
  • A set of CDC source connectors. These are also pretty cool, but in a sense “just” connectors.

The relationship between these two is somewhat lop-sided: whilst the pipelines rely on a CDC source connector, not all CDC source connectors have a corresponding pipeline connector. The upshot of this is that as of Flink CDC 3.2 (early December 2024) the only source pipeline connector is for MySQL, so if you are working with a different database, you’ll have to use the respective source connector directly. The source connectors support the Flink Table API and so can be used from Flink SQL, PyFlink, and Java. Source connectors include Postgres, Oracle, MySQL, and several more.

In terms of sink pipeline connectors there is a bit more variety, with support for Elasticsearch, Kafka, Paimon, and several other technologies.

But let’s not get bogged down in connectors. The declarative pipelines feature of Flink CDC is the bit that is really important to get your head around, as it’s pretty darn impressive. Many of the gnarly or tedious (or tediously gnarly) things that come about in building a data integration pipeline such as schema evolution, full or partial database sync, primary key handling, data type translation, and transformation are all handled for you.

Here’s an example of a YAML file for a continuous sync of tables in MySQL to Elasticsearch:

pipeline:
  name: Sync MySQL inventory tables to Elasticsearch
  
source:  
  type: mysql  
  hostname: mysql-server.acme.com
  port: 3306  
  username: db_user  
  password: Password123
  tables: inventory.\.*  
  
sink:  
  type: elasticsearch
  version: 7  
  hosts: http://es-host.acme.com:9200

That’s it! And with that Flink CDC will sync all the tables under inventory schema over to corresponding indices in Elasticsearch. It’ll map primary keys to document IDs, update documents in-place if the source MySQL row changes, and so on.

Why are Flink CDC pipelines such a big deal?

Look at that YAML up there 👆

That’s all it takes to generate and run a Flink job:

./bin/flink-cdc.sh mysql-to-es.yaml
Pipeline has been submitted to cluster.
Job ID: 21fd74e8f7ed2a9cde3b7b855e1bed55 
Description: Sync MySQL inventory tables to Elasticsearch

A Flink CDC pipeline can do pretty much all the heavy lifting of data integration for you, including:

  • Matching to one, some, or all of the tables in the source database
  • Snapshotting the source tables, using CDC to capture and propagate all subsequent changes
  • Providing exactly-once semantics
  • Low latency streaming, not batch
  • Support for schema evolution
  • Data transformation
  • Data type translation

We’re going to take a look at those later in this post, but first up, let’s explore more about what it is that makes Flink CDC pipelines such a big deal.

I still don’t get it. How’s this different from Flink SQL? Or, Can’t I just use PyFlink?

I’ve written before about using Flink SQL for ETL pipelines. At a very high level, the pattern looks like this:

  1. Get your head around Flink catalogs and then try them out until you’ve decided which you’ll use. That, or just use the in-memory Flink catalog and get used to having to re-run all your DDL each time you restart your Flink SQL session…
  2. Create a Flink SQL table using the source connector. If you want data from a database this might well be one of the Flink CDC source connectors, or any other that Flink offers.
  3. Create a Flink SQL table using a sink connector.
  4. Run <span class="inline-code">INSERT INTO my_sink SELECT * FROM my_source</span>.
  5. Profit.

But…the devil is most definitely in the detail. I’m feeling benevolent so we’ll skip over #1 and assume that you have Flink catalogs completely understood and in hand. Let’s consider #2. What does that statement look like? What’s the equivalent of our Flink CDC pipeline YAML?

source:
  type: mysql  
  hostname: mysql-server.acme.com
  port: 3306  
  username: db_user  
  password: Password123
  tables: inventory.\.*

First off we need to know what the tables are, so we need to query MySQL:

$ mysql -uroot -phunter2 -e "show tables" inventory
+---------------------+
| Tables_in_inventory |
+---------------------+
| addresses           |
| customers           |
| geom                |
| orders              |
| products            |
| products_on_hand    |
+---------------------+

OK, so we’ve got six tables. When we create a Flink table we need to specify the schema. Therefore, we need the schema for each of the source tables:

$ mysqldump -uroot -phunter2 --no-data inventory
[…]
--
-- Table structure for table `customers`
--

DROP TABLE IF EXISTS `customers`;
/*!40101 SET @saved_cs_client     = @@character_set_client */;
/*!50503 SET character_set_client = utf8mb4 */;
CREATE TABLE `customers` (
  `id` int NOT NULL AUTO_INCREMENT,
  `first_name` varchar(255) NOT NULL,
  `last_name` varchar(255) NOT NULL,
  `email` varchar(255) NOT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `email` (`email`)
) ENGINE=InnoDB AUTO_INCREMENT=1005 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
/*!40101 SET character_set_client = @saved_cs_client */;

[…DDL for five other tables…]

Now we need to tidy the DDL up so that it’s usable in Flink SQL. We’ll ignore the <span class="inline-code">DROP TABLE</span> and the <span class="inline-code">ENGINE […]</span> bits, and add on the necessary <span class="inline-code">WITH</span> clause for the connector:

CREATE TABLE `customers` (
  `id` int NOT NULL AUTO_INCREMENT,
  `first_name` varchar(255) NOT NULL,
  `last_name` varchar(255) NOT NULL,
  `email` varchar(255) NOT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `email` (`email`)
) WITH (
       'connector' = 'mysql-cdc',
       'hostname' = 'mysql',
       'port' = '3306',
       'username' = 'db_user',
       'password' = 'Password123',
       'database-name' = 'inventory',
       'table-name' = 'customers');

The trouble is, it needs more tidy-up than that:

[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.sql.parser.impl.ParseException: Encountered "AUTO_INCREMENT" at line 2, column 21.

OK, strip that out. Still not enough:

[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.sql.parser.impl.ParseException: Encountered "KEY" at line 7, column 10.

Let’s ditch the <span class="inline-code">KEY</span> stuff then, which seems to work:

[INFO] Execute statement succeed.

…except that it doesn’t:

Flink SQL> SELECT * FROM customers;
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: 'scan.incremental.snapshot.chunk.key-column' is required for table without primary key when 'scan.incremental.snapshot.enabled' enabled.

So with the primary key added back in (along with <span class="inline-code">NOT ENFORCED</span>), the successful statement looks like this:

CREATE TABLE `customers` (  
  `id` int NOT NULL,  
  `first_name` varchar(255) NOT NULL,  
  `last_name` varchar(255) NOT NULL,  
  `email` varchar(255) NOT NULL,  
  PRIMARY KEY (`id`) NOT ENFORCED)   
  WITH (  
     'connector' = 'mysql-cdc',  
     'hostname' = 'mysql',  
     'port' = '3306',  
     'username' = 'db_user',  
     'password' = 'Password123',  
     'database-name' = 'inventory',  
     'table-name' = 'customers');

Let’s verify that the data is being fetched from MySQL:

Flink SQL> SET 'sql-client.execution.result-mode' = 'tableau';  
[INFO] Execute statement succeed.  
  
Flink SQL> SELECT * FROM customers;  
+----+-------+------------+------------+-----------------------+  
| op |    id | first_name |  last_name |                 email |  
+----+-------+------------+------------+-----------------------+  
| +I |  1002 |     George |     Bailey |    gbailey@foobar.com |  
| +I |  1003 |     Edward |     Walker |         ed@walker.com |  
| +I |  1001 |      Sally |     Thomas | sally.thomas@acme.com |  
| +I |  1004 |       Anne |  Kretchmar |    annek@noanswer.org |

With the source table (one table of the six in MySQL, remember) let’s assume the others will be fine (narrator: they weren’t; turns out <span class="inline-code">ENUM</span> and <span class="inline-code">GEOMETRY</span> also need some manual intervention) and look at the sink end of the pipeline. This, in theory, should be simpler. Right?

Our equivalent of the Flink CDC pipeline YAML...

sink:  
  type: elasticsearch
  version: 7  
  hosts: http://es-host.acme.com:9200

...is a series of Flink SQL tables using the Elasticsearch connector to stream data from the corresponding source Flink SQL table. Continuing with the <span class="inline-code">customers</span> example that we eventually successfully created above, the Elasticsearch sink looks like this:

CREATE TABLE `es_customers` 
    WITH ('connector'='elasticsearch-7',
          'hosts'='http://es-host.acme.com:9200',
          'index'='customers')
    AS SELECT * FROM customers;

Good ’ole <span class="inline-code">CREATE TABLE…AS SELECT</span>, or <span class="inline-code">CTAS</span> as it’s affectionately known. The problem is, it’s not this easy. To start with everything looks just great - here’s one of the MySQL rows as a document in Elasticsearch:

$ http -b "localhost:9200/customers/_search?size=1&filter_path=hits.hits"

If you’re super eagle-eyed perhaps you can spot the problem. I wasn’t and didn’t, and as a result assumed that when I updated MySQL:

$ mysql -uroot -phunter2 -e "UPDATE customers SET last_name='Astley' WHERE id=1004;" inventory

I would not only see it reflected in the Flink SQL table, which I did (as a pair of change records):

Flink SQL> SELECT * FROM customers;  
+----+-------+------------+-----------+-----------------------+  
| op |    id | first_name | last_name |                 email |  
+----+-------+------------+-----------+-----------------------+  
[…]
| -U |  1004 |       Anne | Kretchmar |    annek@noanswer.org |  
| +U |  1004 |       Anne |    Astley |    annek@noanswer.org |

but also in Elasticsearch, with the document showing the update. But instead I got this:

$ http -b POST "localhost:9200/customers/_search" \  
    Content-Type:application/json \  
    query:='{ "match": { "id": "1004" } }'
{  
    "_id": "D7inlpMB3OCvuWyIjxW8",  
    "_index": "customers",  
    "_score": 1.0,  
    "_source": {  
        "email": "annek@noanswer.org",  
        "first_name": "Anne",  
        "id": 1004,  
        "last_name": "Kretchmar"  
    },  
    "_type": "_doc"  
},  
{  
    "_id": "E7iylpMB3OCvuWyI8xXj",  
    "_index": "customers",  
    "_score": 1.0,  
    "_source": {  
        "email": "annek@noanswer.org",  
        "first_name": "Anne",  
        "id": 1004,  
        "last_name": "Astley"  
    },  
    "_type": "_doc"  
}

Instead of updating the document in place, I got a new document appended to the index. The reason was that the Flink SQL table with the Elasticsearch sink didn’t have the primary key propagated to it as part of the <span class="inline-code">CREATE TABLE…AS SELECT</span>. You can verify this from Flink SQL—note the empty key column:

Flink SQL> DESCRIBE es_customers;
+------------+--------------+-------+-----+--------+-----------+
|       name |         type |  null | key | extras | watermark |
+------------+--------------+-------+-----+--------+-----------+
|         id |          INT | FALSE |     |        |           |
| first_name | VARCHAR(255) | FALSE |     |        |           |
|  last_name | VARCHAR(255) | FALSE |     |        |           |
|      email | VARCHAR(255) | FALSE |     |        |           |
+------------+--------------+-------+-----+--------+-----------+
4 rows in set

Since there’s no primary key, Elasticsearch is just creating its own key for each document created, and thus the <span class="inline-code">UPDATE</span> doesn’t propagate as we’d expect.

The fix is therefore to make sure there is a primary key declared. We can’t do this with CTAS itself:

CREATE TABLE `es_customers` (
    `id` int NOT NULL,  
    `first_name` varchar(255) NOT NULL,  
    `last_name` varchar(255) NOT NULL,  
    `email` varchar(255) NOT NULL,  
    PRIMARY KEY (`id`) NOT ENFORCED
    )
    WITH ('connector'='elasticsearch-7',
          'hosts'='http://es-host.acme.com:9200',
          'index'='customers')
    AS SELECT * FROM customers;
[ERROR] Could not execute SQL statement. Reason:  
org.apache.flink.sql.parser.error.SqlValidateException:
CREATE TABLE AS SELECT syntax does not support to specify explicit columns yet.

So instead we have to split it into a <span class="inline-code">CREATE</span> and then an <span class="inline-code">INSERT</span>:

Flink SQL> DROP TABLE `es_customers`;
Flink SQL> CREATE TABLE `es_customers` (
    `id` int NOT NULL,  
    `first_name` varchar(255) NOT NULL,  
    `last_name` varchar(255) NOT NULL,  
    `email` varchar(255) NOT NULL,  
    PRIMARY KEY (`id`) NOT ENFORCED
    )
    WITH ('connector'='elasticsearch-7',
          'hosts'='http://es-host.acme.com:9200',
          'index'='customers01');
[INFO] Execute statement succeed.  
  
Flink SQL> INSERT INTO es_customers SELECT * FROM customers ;  

[INFO] Submitting SQL update statement to the cluster...  
[INFO] SQL update statement has been successfully submitted to the cluster:  
Job ID: a1a7b612ebe8aa9d07636a325cfbdf84

(Note that I’ve taken the lazy route and sent the data for this updated approach to a new Elasticsearch index, <span class="inline-code">customers01</span>, to avoid further complications.)

Now the <span class="inline-code">id</span> field is used as the <span class="inline-code">document _id</span>:

{
    "_id": "1004",
    "_index": "customers01",
    "_score": 1.0,
    "_source": {
        "email": "annek@noanswer.org",
        "first_name": "Anne",
        "id": 1004,
        "last_name": "Astley"
    },
    "_type": "_doc"
}

and when the record is updated in MySQL:

$ mysql mysql -uroot -phunter2 -e "UPDATE customers SET first_name='Rick' WHERE id=1004;" inventory

the document in Elasticsearch gets updated correctly too:

$ http -b POST "localhost:9200/customers01/_search" \
    Content-Type:application/json \
    query:='{ "match": { "id": "1004" } }'
[…]
    "hits": {
        "hits": [
            {
                "_id": "1004",
                "_index": "customers01",
                "_score": 1.0,
                "_source": {
                    "email": "annek@noanswer.org",
                    "first_name": "Rick",
                    "id": 1004,
                    "last_name": "Kretchmar"
                },
[…]

Phew, we got there in the end, right?! 😅 But that was one table. Of six. And all we’re doing is handling the relatively straightforward business of primary key propagation. This example in itself shows how Flink CDC pipelines are going to save orders of magnitude of engineering effort when it comes to building pipelines. Just to remind ourselves, this is how we accomplish all of what we did here (and more, since we only actually worked through one table!), in YAML:

pipeline:
  name: Sync MySQL inventory tables to Elasticsearch
  
source:  
  type: mysql  
  hostname: mysql-server.acme.com
  port: 3306  
  username: db_user  
  password: Password123
  tables: inventory.\.*  
  
sink:  
  type: elasticsearch
  version: 7  
  hosts: http://es-host.acme.com:9200

Flink CDC is solving a common problem

Before the advent of Flink CDC 3.0 and its notion of end-to-end pipelines, maintaining this kind of data flow was much more cumbersome, requiring to define a potentially large number of bespoke source and sink tables by hand. In Decodable, we have simplified this set-up via things like multi-stream connectors and declarative resource management, and it’s great to see that similar concepts are available now in Flink CDC, substantially reducing the effort for creating and operating comprehensive data pipelines.

Digging into Flink CDC pipelines in more depth

Schema evolution

Just as data doesn’t stand still and that’s why we use streams, the shape of data is always changing too. If we’ve got a process replicating a set of tables from one place to another, we need to know that it’s going to handle changes happening to the schema.

Let’s imagine we add a field to the <span class="inline-code">customers</span> table:

mysql> use inventory;
Database changed

mysql> ALTER TABLE customers ADD country VARCHAR(255);
Query OK, 0 rows affected (0.06 sec)
Records: 0  Duplicates: 0  Warnings: 0

mysql> UPDATE customers SET country='UK' WHERE id=1001;
Query OK, 1 row affected (0.02 sec)
Rows matched: 1  Changed: 1  Warnings: 0

How does the Flink CDC pipeline that we set running earlier handle it? Well, it doesn’t crash, which is always a nice start:

Not only that, but it seamlessly propagates the column change and update to Elasticsearch:

❯ http -b localhost:9200/inventory.customers/_doc/1001
{
    "_id": "1001",
    "_index": "inventory.customers",
    "_primary_term": 1,
    "_seq_no": 4,
    "_source": {
        "country": "UK",
        "email": "sally.thomas@acme.com",
        "first_name": "Sally",
        "id": 1001,
        "last_name": "Thomas"
    },
    "_type": "_doc",
    "_version": 2,
    "found": true
}

To do this with Flink SQL would be messy. In fact, I would need to go and learn how to do it (and first, determine if it’s even possible). You’d want to be able to preserve the state of the existing job so that you’re not unnecessarily reprocessing records. Then you’d need to drop the existing source table, create the new table with the updated schema, and then run the <span class="inline-code">INSERT INTO sink_table</span> again, hopefully restoring the state from the previous incarnation.

Perhaps you don’t want schema changes to persist downstream, particularly if they’re destructive (such as removing a column). Flink CDC pipelines can be customised to control how schema evolution is handled (including ignoring it entirely):

pipeline:
  # Don't pass through any schema changes to the sink
  schema.change.behavior: ignore

You can also customise at the sink level how individual types of schema change event are processed by the sink:

sink:
  type: elasticsearch
  version: 7  
  hosts: http://es-host.acme.com:9200  
  exclude.schema.changes: truncate.table

Routing and renaming tables

By default, Flink CDC pipelines will use the source table name as the target too. In our example <span class="inline-code">inventory.customers</span> in MySQL is written to Elasticsearch as exactly that—an index called <span class="inline-code">inventory.customers</span>.

Using the route YAML configuration you can customise the name of the target object, and through the same method, fan-in multiple source tables to a single target (by setting all the sources to a single target object name). You can also selectively rename certain tables matching a pattern. Here’s an example which will add a prefix to any table matching the <span class="inline-code">inventory.product*</span> pattern:

route:
  - source-table: inventory.product\.*
    sink-table: routing_example__$
    replace-symbol: $
    description: > 
        route all tables that begin with 'product'
        to a target prefixed with 'routing_example__'

Which from a set of MySQL tables like this:

mysql> show tables;
+---------------------+
| Tables_in_inventory |
+---------------------+
| addresses           |
| customers           |
| geom                |
| orders              |
| products            |
| products_on_hand    |
+---------------------+
6 rows in set (0.00 sec)

is streamed into these Elasticsearch indices—notice the <span class="inline-code">routing_example__</span> prefix on the two <span class="inline-code">product*</span> tables:

❯ http -b "localhost:9200/_cat/indices/?h=index"
inventory.geom
routing_example__products
inventory.customers
inventory.addresses
routing_example__products_on_hand
inventory.orders

Unlike things like schema evolution, the routing and renaming of tables is something that’s handled in a fairly straightforward way with Flink SQL; you’d change the target object name as part of the DDL. For routing multiple tables into one you have multiple <span class="inline-code">INSERT INTO</span> statements, or a single <span class="inline-code">INSERT</span> made up of a series of <span class="inline-code">UNION</span>s. It’s not as elegant or self-explanatory as declarative YAML, but it’s not the eye-wateringly painful process of setting up multiple tables into the pipeline in the first place.

One more use of route to mention is if you’re iteratively developing a pipeline and don’t want to have to clean out the target system each time you restart it. This YAML will add a suffix to each table name from the inventory database when it’s written to the sink. Each time you have a new test version of the pipeline, bump the suffix and it’ll write to a new set of tables.

route:
  - source-table: inventory.\.*
    sink-table: $-01
    replace-symbol: $
    description: Add a suffix to all tables

Transforms

As with routing, transforms kind of come as part and parcel of using Flink SQL to build a pipeline. If you’re already having to write a <span class="inline-code">CREATE TABLE</span> and <span class="inline-code">INSERT INTO</span> manually, then customising these to apply transformations is pretty standard. Say you want to create a new field that’s a composite of two others, in Flink SQL you’d do this:

-- Declare the table as before, but adding a new column
-- to hold the computed field ('new_field')
CREATE TABLE source_tbl (existing_field VARCHAR(255),
                         …,
                         …,
                         new_field VARCHAR(255),
                         PRIMARY KEY ('key_field') NOT ENFORCED)
        WITH ('connector-config' = 'goes here'
              …)

-- Run the INSERT INTO and add the expression to compute the new field
INSERT INTO target SELECT existing_field,
                          …,
                          …,
                          src_field1 || ' ' || src_field_2 AS new_field
                     FROM src_table;

An actual example of this in practice joining the <span class="inline-code">first_name</span> and <span class="inline-code">last_name</span> fields in the <span class="inline-code">customers</span> table in Flink CDC pipeline’s transform YAML is the somewhat less cumbersome and error-prone declarative instruction: 

transform:
  - source-table: inventory.customers
    projection: \*, first_name || ' ' || last_name as full_name
    description: Concatenate name fields together in customers table

Note the <span class="inline-code">\*</span> — this is adding all the existing fields to the projection (it’s a <span class="inline-code">*</span> escaped with a <span class="inline-code">\</span>). Without it the only field that would be included from <span class="inline-code">customers</span> would be the new one. Put another way, projection is basically whatever you’d write in your <span class="inline-code">SELECT</span> (which is indeed what a <span class="inline-code">SELECT</span> is—the projection clause of a SQL statement).

If you go and look in the Flink Web UI you’ll see an additional Transform operator has been added:

As well as adding fields with a transformation, you can remove them too, by omitting them from the projection list. For example, if a <span class="inline-code">customer_details</span> source table has the fields<span class="inline-code">account_id, update_ts, social_security_num</span> and you want to omit the personally identifiable information (PII), you could do this in the pipeline with:

transform:
  - source-table: inventory.customer_details
    projection: >
      account_id,
      update_ts
    description: Don’t PII in the pipeline

The similar principle would apply to doing this in Flink SQL.

Unfortunately Flink SQL nor CDC pipelines support an exclusion parameter for projections in the way that some engines including Snowflake and DuckDB do which means that you can’t do something like <span class="inline-code">SELECT * EXCLUDE social_security_num FROM customer_details</span>. This therefore makes the pipeline a bit more brittle than is ideal since you’d need to modify it every time a new non-PII column was added to the schema.

You can also use transformations to lift metadata from the pipeline rows into the target table:

transform:
  - source-table: inventory.\.*
    projection: >
      \*,
      __table_name__,  __namespace_name__,  __schema_name__,  __data_event_type__
    description: Add metadata to all tables from inventory db

Note that combining transforms can get a bit problematic if they’re both modifying the projection for the same table. Check the Flink logs if in doubt for errors.

Some notes about running Flink CDC Pipelines

As I’ll discuss later in this post, there are still some gaps in Flink CDC. One of those is the ops side of things. Running a pipeline is simple enough:

$ ./bin/flink-cdc.sh mysql-to-es.yaml

But then what? If the pipeline fails to compile, you’ll get an error back to the console. But assuming it compiles, it still might not work. From this point on, it’s simply a Flink job to inspect and manage as any other. You’ve got the built-in Flink Web UI which is also useful, as well as CLI and REST API options:

$ ./bin/flink list -r

------------------ Running/Restarting Jobs -------------------
05.12.2024 22:14:44 : ed609d9e2af52260ba7cc075ab6480c1 : Sync MySQL inventory tables to Elasticsearch (RUNNING)
--------------------------------------------------------------

$ http GET "http://localhost:8081/jobs/overview" | jq '.'
{
  "jobs": [
    {
      "jid": "70eb85b2554ac2c9e31892c85bbd3687",
      "name": "Sync MySQL inventory tables to Elasticsearch",
      "start-time": 1733441346671,
      "end-time": 1733441388219,
      "duration": 41548,
      "state": "RUNNING",
[…]

Using these you can cancel a running job by passing its id (which you’ll also get from Flink CDC when it launches the pipeline):

$ ./bin/flink cancel ed609d9e2af52260ba7cc075ab6480c1
Cancelling job ed609d9e2af52260ba7cc075ab6480c1.
Cancelled job ed609d9e2af52260ba7cc075ab6480c1.

You can also cancel all running jobs on the cluster, which if you’re running on a sandbox instance and randomly jiggling things until they unbreak can be a bit of a timesaver:

# use httpie and jq to kill all running/restarting jobs on the cluster
http GET "http://localhost:8081/jobs/overview" | \
  jq -r '.jobs[] | select(.state=="RUNNING" or .state=="RESTARTING") | .jid' | \
  while read jid; do
    http PATCH "http://localhost:8081/jobs/$jid"
  done

If you look closely at the above logic, it’s not only <span class="inline-code">RUNNING</span> jobs that are matched, but also <span class="inline-code">RESTARTING</span>. One of the things that I noticed a lot was that jobs might be logically invalid (e.g. specifying a column that doesn’t exist) but don’t abort and instead simply go into a seemingly infinite <span class="inline-code">RESTARTING/RUNNING</span> loop. The problem with this is that it’s not always apparent that there’s even a problem, particularly if you happen to look at the job status when it’s <span class="inline-code">RUNNING< and before it’s failed. Add a route so that you don’t have to clean up the target each time when you’re iteratively testing.

This sounds great. What’s the catch?

It’s still pretty early days with Flink CDC, and 3.3 is being worked on as I type, so it’ll be good to see how it shapes up.

Put bluntly, Flink CDC pipelines show a ton of potential. Is it a cool idea? Yup. Does the world need more YAML? Of course! Does it open up stream processing and integration to non-Java coders more than Flink SQL does? Definitely. Is it ready for much more than just experimentation? Well…

If your source database is MySQL, and a pipeline connector exists for your target technology, then sure, it’s worth keeping an eye on. If open source development is your thing then jump right in and get involved, because that’s what it needs. A bit more polish, and a lot more pipeline connectors.

Again, I want to be clear—this is a relatively young project. Apache Flink itself has been around for TEN years, whilst Flink CDC much less than that. As a huge fan of the open source world, I will be completely cheering for Flink CDC to grow and succeed. As a data engineer looking around at tools to get my job done, I am still cautious until it’s proved itself further. 

From that angle, of a data engineer evaluating a tool, one of the key areas that it really needs to develop in is connectors. It’s missing many of the common sinks such as Snowflake, Apache Iceberg, RDBMS, etc. On the source side I’d hope that the existing set of Flink CDC source connectors soon make it into the pipeline connector world too. Whilst you could work around a missing sink connector by using the Kafka sink pipeline connector and then taking the data from Kafka to the desired sink using Flink SQL (or Kafka Connect), this really defeats the point of a single tool to build a pipeline declaratively. It helps a little bit, but you’re still where you are today—piecing together bits of SQL and config manually into a bespoke pipeline.

Other things that will help it grow its adoption include:

  • Better validation of input, and cleaner errors when things don’t go right
  • Support for stateful pipelines—at the moment you can’t join or aggregate.
  • Clearer and more accurate documentation, both for specifics and also the project overall. It took me writing this post to really get my head around the real power of Flink CDC pipelines and what they actually are (and aren’t).
  • The ops side of things seems a bit rudimentary. After compilation it’s “fire & forget”, leaving you to poke around the standard Flink job management interfaces. This is perhaps “icing on the cake”, but if it’s simplifying building jobs, why not simplify running them too?
  • Support for Debezium 3.0 and its richer functionality including better snapshotting, improved performance, and richer support for things like Postgres replication slots on read replicas.

Ultimately, Flink CDC brings a really rich set of functionality, so let’s hope it can really develop and thrive as part of the Apache Flink ecosystem.


If you’d like to try Flink CDC out for yourself, you can find a Docker Compose file and full details on GitHub.

📫 Email signup 👇

Did you enjoy this issue of Checkpoint Chronicle? Would you like the next edition delivered directly to your email to read from the comfort of your own home?

Simply enter your email address here and we'll send you the next issue as soon as it's published—and nothing else, we promise!

👍 Got it!
Oops! Something went wrong while submitting the form.
Robin Moffatt

Robin is a Principal DevEx Engineer at Decodable. He has been speaking at conferences since 2009 including QCon, Devoxx, Strata, Kafka Summit, and Øredev. You can find many of his talks online and his articles on the Decodable blog as well as his own blog.

Outside of work, Robin enjoys running, drinking good beer, and eating fried breakfasts—although generally not at the same time.

Flink CDC is an interesting part of Apache Flink that I’ve been meaning to take a proper look at for some time now. Originally created by Ververica in 2021 and called “CDC Connectors for Apache Flink”, it was donated to live under the Apache Flink project in April 2024.

In this post I’m going to look at what Flink CDC actually is (because it took me a while to properly grok it), and consider where it fits into the data engineers’ toolkit. I’m looking at this from a non-coding point of view—SQL and YAML is all we need, right? 😉 For Java and PyFlink your mileage may vary (YMMV) on this, but I still think Flink CDC has a strong home even in this part of the ecosystem for some use cases.

Let’s start with what Flink CDC does. It provides two distinct features:

  • Pipeline Connectors to create CDC pipelines declaratively from YAML. This is really powerful and its value shouldn’t be underestimated.
  • A set of CDC source connectors. These are also pretty cool, but in a sense “just” connectors.

The relationship between these two is somewhat lop-sided: whilst the pipelines rely on a CDC source connector, not all CDC source connectors have a corresponding pipeline connector. The upshot of this is that as of Flink CDC 3.2 (early December 2024) the only source pipeline connector is for MySQL, so if you are working with a different database, you’ll have to use the respective source connector directly. The source connectors support the Flink Table API and so can be used from Flink SQL, PyFlink, and Java. Source connectors include Postgres, Oracle, MySQL, and several more.

In terms of sink pipeline connectors there is a bit more variety, with support for Elasticsearch, Kafka, Paimon, and several other technologies.

But let’s not get bogged down in connectors. The declarative pipelines feature of Flink CDC is the bit that is really important to get your head around, as it’s pretty darn impressive. Many of the gnarly or tedious (or tediously gnarly) things that come about in building a data integration pipeline such as schema evolution, full or partial database sync, primary key handling, data type translation, and transformation are all handled for you.

Here’s an example of a YAML file for a continuous sync of tables in MySQL to Elasticsearch:

pipeline:
  name: Sync MySQL inventory tables to Elasticsearch
  
source:  
  type: mysql  
  hostname: mysql-server.acme.com
  port: 3306  
  username: db_user  
  password: Password123
  tables: inventory.\.*  
  
sink:  
  type: elasticsearch
  version: 7  
  hosts: http://es-host.acme.com:9200

That’s it! And with that Flink CDC will sync all the tables under inventory schema over to corresponding indices in Elasticsearch. It’ll map primary keys to document IDs, update documents in-place if the source MySQL row changes, and so on.

Why are Flink CDC pipelines such a big deal?

Look at that YAML up there 👆

That’s all it takes to generate and run a Flink job:

./bin/flink-cdc.sh mysql-to-es.yaml
Pipeline has been submitted to cluster.
Job ID: 21fd74e8f7ed2a9cde3b7b855e1bed55 
Description: Sync MySQL inventory tables to Elasticsearch

A Flink CDC pipeline can do pretty much all the heavy lifting of data integration for you, including:

  • Matching to one, some, or all of the tables in the source database
  • Snapshotting the source tables, using CDC to capture and propagate all subsequent changes
  • Providing exactly-once semantics
  • Low latency streaming, not batch
  • Support for schema evolution
  • Data transformation
  • Data type translation

We’re going to take a look at those later in this post, but first up, let’s explore more about what it is that makes Flink CDC pipelines such a big deal.

I still don’t get it. How’s this different from Flink SQL? Or, Can’t I just use PyFlink?

I’ve written before about using Flink SQL for ETL pipelines. At a very high level, the pattern looks like this:

  1. Get your head around Flink catalogs and then try them out until you’ve decided which you’ll use. That, or just use the in-memory Flink catalog and get used to having to re-run all your DDL each time you restart your Flink SQL session…
  2. Create a Flink SQL table using the source connector. If you want data from a database this might well be one of the Flink CDC source connectors, or any other that Flink offers.
  3. Create a Flink SQL table using a sink connector.
  4. Run <span class="inline-code">INSERT INTO my_sink SELECT * FROM my_source</span>.
  5. Profit.

But…the devil is most definitely in the detail. I’m feeling benevolent so we’ll skip over #1 and assume that you have Flink catalogs completely understood and in hand. Let’s consider #2. What does that statement look like? What’s the equivalent of our Flink CDC pipeline YAML?

source:
  type: mysql  
  hostname: mysql-server.acme.com
  port: 3306  
  username: db_user  
  password: Password123
  tables: inventory.\.*

First off we need to know what the tables are, so we need to query MySQL:

$ mysql -uroot -phunter2 -e "show tables" inventory
+---------------------+
| Tables_in_inventory |
+---------------------+
| addresses           |
| customers           |
| geom                |
| orders              |
| products            |
| products_on_hand    |
+---------------------+

OK, so we’ve got six tables. When we create a Flink table we need to specify the schema. Therefore, we need the schema for each of the source tables:

$ mysqldump -uroot -phunter2 --no-data inventory
[…]
--
-- Table structure for table `customers`
--

DROP TABLE IF EXISTS `customers`;
/*!40101 SET @saved_cs_client     = @@character_set_client */;
/*!50503 SET character_set_client = utf8mb4 */;
CREATE TABLE `customers` (
  `id` int NOT NULL AUTO_INCREMENT,
  `first_name` varchar(255) NOT NULL,
  `last_name` varchar(255) NOT NULL,
  `email` varchar(255) NOT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `email` (`email`)
) ENGINE=InnoDB AUTO_INCREMENT=1005 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
/*!40101 SET character_set_client = @saved_cs_client */;

[…DDL for five other tables…]

Now we need to tidy the DDL up so that it’s usable in Flink SQL. We’ll ignore the <span class="inline-code">DROP TABLE</span> and the <span class="inline-code">ENGINE […]</span> bits, and add on the necessary <span class="inline-code">WITH</span> clause for the connector:

CREATE TABLE `customers` (
  `id` int NOT NULL AUTO_INCREMENT,
  `first_name` varchar(255) NOT NULL,
  `last_name` varchar(255) NOT NULL,
  `email` varchar(255) NOT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `email` (`email`)
) WITH (
       'connector' = 'mysql-cdc',
       'hostname' = 'mysql',
       'port' = '3306',
       'username' = 'db_user',
       'password' = 'Password123',
       'database-name' = 'inventory',
       'table-name' = 'customers');

The trouble is, it needs more tidy-up than that:

[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.sql.parser.impl.ParseException: Encountered "AUTO_INCREMENT" at line 2, column 21.

OK, strip that out. Still not enough:

[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.sql.parser.impl.ParseException: Encountered "KEY" at line 7, column 10.

Let’s ditch the <span class="inline-code">KEY</span> stuff then, which seems to work:

[INFO] Execute statement succeed.

…except that it doesn’t:

Flink SQL> SELECT * FROM customers;
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: 'scan.incremental.snapshot.chunk.key-column' is required for table without primary key when 'scan.incremental.snapshot.enabled' enabled.

So with the primary key added back in (along with <span class="inline-code">NOT ENFORCED</span>), the successful statement looks like this:

CREATE TABLE `customers` (  
  `id` int NOT NULL,  
  `first_name` varchar(255) NOT NULL,  
  `last_name` varchar(255) NOT NULL,  
  `email` varchar(255) NOT NULL,  
  PRIMARY KEY (`id`) NOT ENFORCED)   
  WITH (  
     'connector' = 'mysql-cdc',  
     'hostname' = 'mysql',  
     'port' = '3306',  
     'username' = 'db_user',  
     'password' = 'Password123',  
     'database-name' = 'inventory',  
     'table-name' = 'customers');

Let’s verify that the data is being fetched from MySQL:

Flink SQL> SET 'sql-client.execution.result-mode' = 'tableau';  
[INFO] Execute statement succeed.  
  
Flink SQL> SELECT * FROM customers;  
+----+-------+------------+------------+-----------------------+  
| op |    id | first_name |  last_name |                 email |  
+----+-------+------------+------------+-----------------------+  
| +I |  1002 |     George |     Bailey |    gbailey@foobar.com |  
| +I |  1003 |     Edward |     Walker |         ed@walker.com |  
| +I |  1001 |      Sally |     Thomas | sally.thomas@acme.com |  
| +I |  1004 |       Anne |  Kretchmar |    annek@noanswer.org |

With the source table (one table of the six in MySQL, remember) let’s assume the others will be fine (narrator: they weren’t; turns out <span class="inline-code">ENUM</span> and <span class="inline-code">GEOMETRY</span> also need some manual intervention) and look at the sink end of the pipeline. This, in theory, should be simpler. Right?

Our equivalent of the Flink CDC pipeline YAML...

sink:  
  type: elasticsearch
  version: 7  
  hosts: http://es-host.acme.com:9200

...is a series of Flink SQL tables using the Elasticsearch connector to stream data from the corresponding source Flink SQL table. Continuing with the <span class="inline-code">customers</span> example that we eventually successfully created above, the Elasticsearch sink looks like this:

CREATE TABLE `es_customers` 
    WITH ('connector'='elasticsearch-7',
          'hosts'='http://es-host.acme.com:9200',
          'index'='customers')
    AS SELECT * FROM customers;

Good ’ole <span class="inline-code">CREATE TABLE…AS SELECT</span>, or <span class="inline-code">CTAS</span> as it’s affectionately known. The problem is, it’s not this easy. To start with everything looks just great - here’s one of the MySQL rows as a document in Elasticsearch:

$ http -b "localhost:9200/customers/_search?size=1&filter_path=hits.hits"

If you’re super eagle-eyed perhaps you can spot the problem. I wasn’t and didn’t, and as a result assumed that when I updated MySQL:

$ mysql -uroot -phunter2 -e "UPDATE customers SET last_name='Astley' WHERE id=1004;" inventory

I would not only see it reflected in the Flink SQL table, which I did (as a pair of change records):

Flink SQL> SELECT * FROM customers;  
+----+-------+------------+-----------+-----------------------+  
| op |    id | first_name | last_name |                 email |  
+----+-------+------------+-----------+-----------------------+  
[…]
| -U |  1004 |       Anne | Kretchmar |    annek@noanswer.org |  
| +U |  1004 |       Anne |    Astley |    annek@noanswer.org |

but also in Elasticsearch, with the document showing the update. But instead I got this:

$ http -b POST "localhost:9200/customers/_search" \  
    Content-Type:application/json \  
    query:='{ "match": { "id": "1004" } }'
{  
    "_id": "D7inlpMB3OCvuWyIjxW8",  
    "_index": "customers",  
    "_score": 1.0,  
    "_source": {  
        "email": "annek@noanswer.org",  
        "first_name": "Anne",  
        "id": 1004,  
        "last_name": "Kretchmar"  
    },  
    "_type": "_doc"  
},  
{  
    "_id": "E7iylpMB3OCvuWyI8xXj",  
    "_index": "customers",  
    "_score": 1.0,  
    "_source": {  
        "email": "annek@noanswer.org",  
        "first_name": "Anne",  
        "id": 1004,  
        "last_name": "Astley"  
    },  
    "_type": "_doc"  
}

Instead of updating the document in place, I got a new document appended to the index. The reason was that the Flink SQL table with the Elasticsearch sink didn’t have the primary key propagated to it as part of the <span class="inline-code">CREATE TABLE…AS SELECT</span>. You can verify this from Flink SQL—note the empty key column:

Flink SQL> DESCRIBE es_customers;
+------------+--------------+-------+-----+--------+-----------+
|       name |         type |  null | key | extras | watermark |
+------------+--------------+-------+-----+--------+-----------+
|         id |          INT | FALSE |     |        |           |
| first_name | VARCHAR(255) | FALSE |     |        |           |
|  last_name | VARCHAR(255) | FALSE |     |        |           |
|      email | VARCHAR(255) | FALSE |     |        |           |
+------------+--------------+-------+-----+--------+-----------+
4 rows in set

Since there’s no primary key, Elasticsearch is just creating its own key for each document created, and thus the <span class="inline-code">UPDATE</span> doesn’t propagate as we’d expect.

The fix is therefore to make sure there is a primary key declared. We can’t do this with CTAS itself:

CREATE TABLE `es_customers` (
    `id` int NOT NULL,  
    `first_name` varchar(255) NOT NULL,  
    `last_name` varchar(255) NOT NULL,  
    `email` varchar(255) NOT NULL,  
    PRIMARY KEY (`id`) NOT ENFORCED
    )
    WITH ('connector'='elasticsearch-7',
          'hosts'='http://es-host.acme.com:9200',
          'index'='customers')
    AS SELECT * FROM customers;
[ERROR] Could not execute SQL statement. Reason:  
org.apache.flink.sql.parser.error.SqlValidateException:
CREATE TABLE AS SELECT syntax does not support to specify explicit columns yet.

So instead we have to split it into a <span class="inline-code">CREATE</span> and then an <span class="inline-code">INSERT</span>:

Flink SQL> DROP TABLE `es_customers`;
Flink SQL> CREATE TABLE `es_customers` (
    `id` int NOT NULL,  
    `first_name` varchar(255) NOT NULL,  
    `last_name` varchar(255) NOT NULL,  
    `email` varchar(255) NOT NULL,  
    PRIMARY KEY (`id`) NOT ENFORCED
    )
    WITH ('connector'='elasticsearch-7',
          'hosts'='http://es-host.acme.com:9200',
          'index'='customers01');
[INFO] Execute statement succeed.  
  
Flink SQL> INSERT INTO es_customers SELECT * FROM customers ;  

[INFO] Submitting SQL update statement to the cluster...  
[INFO] SQL update statement has been successfully submitted to the cluster:  
Job ID: a1a7b612ebe8aa9d07636a325cfbdf84

(Note that I’ve taken the lazy route and sent the data for this updated approach to a new Elasticsearch index, <span class="inline-code">customers01</span>, to avoid further complications.)

Now the <span class="inline-code">id</span> field is used as the <span class="inline-code">document _id</span>:

{
    "_id": "1004",
    "_index": "customers01",
    "_score": 1.0,
    "_source": {
        "email": "annek@noanswer.org",
        "first_name": "Anne",
        "id": 1004,
        "last_name": "Astley"
    },
    "_type": "_doc"
}

and when the record is updated in MySQL:

$ mysql mysql -uroot -phunter2 -e "UPDATE customers SET first_name='Rick' WHERE id=1004;" inventory

the document in Elasticsearch gets updated correctly too:

$ http -b POST "localhost:9200/customers01/_search" \
    Content-Type:application/json \
    query:='{ "match": { "id": "1004" } }'
[…]
    "hits": {
        "hits": [
            {
                "_id": "1004",
                "_index": "customers01",
                "_score": 1.0,
                "_source": {
                    "email": "annek@noanswer.org",
                    "first_name": "Rick",
                    "id": 1004,
                    "last_name": "Kretchmar"
                },
[…]

Phew, we got there in the end, right?! 😅 But that was one table. Of six. And all we’re doing is handling the relatively straightforward business of primary key propagation. This example in itself shows how Flink CDC pipelines are going to save orders of magnitude of engineering effort when it comes to building pipelines. Just to remind ourselves, this is how we accomplish all of what we did here (and more, since we only actually worked through one table!), in YAML:

pipeline:
  name: Sync MySQL inventory tables to Elasticsearch
  
source:  
  type: mysql  
  hostname: mysql-server.acme.com
  port: 3306  
  username: db_user  
  password: Password123
  tables: inventory.\.*  
  
sink:  
  type: elasticsearch
  version: 7  
  hosts: http://es-host.acme.com:9200

Flink CDC is solving a common problem

Before the advent of Flink CDC 3.0 and its notion of end-to-end pipelines, maintaining this kind of data flow was much more cumbersome, requiring to define a potentially large number of bespoke source and sink tables by hand. In Decodable, we have simplified this set-up via things like multi-stream connectors and declarative resource management, and it’s great to see that similar concepts are available now in Flink CDC, substantially reducing the effort for creating and operating comprehensive data pipelines.

Digging into Flink CDC pipelines in more depth

Schema evolution

Just as data doesn’t stand still and that’s why we use streams, the shape of data is always changing too. If we’ve got a process replicating a set of tables from one place to another, we need to know that it’s going to handle changes happening to the schema.

Let’s imagine we add a field to the <span class="inline-code">customers</span> table:

mysql> use inventory;
Database changed

mysql> ALTER TABLE customers ADD country VARCHAR(255);
Query OK, 0 rows affected (0.06 sec)
Records: 0  Duplicates: 0  Warnings: 0

mysql> UPDATE customers SET country='UK' WHERE id=1001;
Query OK, 1 row affected (0.02 sec)
Rows matched: 1  Changed: 1  Warnings: 0

How does the Flink CDC pipeline that we set running earlier handle it? Well, it doesn’t crash, which is always a nice start:

Not only that, but it seamlessly propagates the column change and update to Elasticsearch:

❯ http -b localhost:9200/inventory.customers/_doc/1001
{
    "_id": "1001",
    "_index": "inventory.customers",
    "_primary_term": 1,
    "_seq_no": 4,
    "_source": {
        "country": "UK",
        "email": "sally.thomas@acme.com",
        "first_name": "Sally",
        "id": 1001,
        "last_name": "Thomas"
    },
    "_type": "_doc",
    "_version": 2,
    "found": true
}

To do this with Flink SQL would be messy. In fact, I would need to go and learn how to do it (and first, determine if it’s even possible). You’d want to be able to preserve the state of the existing job so that you’re not unnecessarily reprocessing records. Then you’d need to drop the existing source table, create the new table with the updated schema, and then run the <span class="inline-code">INSERT INTO sink_table</span> again, hopefully restoring the state from the previous incarnation.

Perhaps you don’t want schema changes to persist downstream, particularly if they’re destructive (such as removing a column). Flink CDC pipelines can be customised to control how schema evolution is handled (including ignoring it entirely):

pipeline:
  # Don't pass through any schema changes to the sink
  schema.change.behavior: ignore

You can also customise at the sink level how individual types of schema change event are processed by the sink:

sink:
  type: elasticsearch
  version: 7  
  hosts: http://es-host.acme.com:9200  
  exclude.schema.changes: truncate.table

Routing and renaming tables

By default, Flink CDC pipelines will use the source table name as the target too. In our example <span class="inline-code">inventory.customers</span> in MySQL is written to Elasticsearch as exactly that—an index called <span class="inline-code">inventory.customers</span>.

Using the route YAML configuration you can customise the name of the target object, and through the same method, fan-in multiple source tables to a single target (by setting all the sources to a single target object name). You can also selectively rename certain tables matching a pattern. Here’s an example which will add a prefix to any table matching the <span class="inline-code">inventory.product*</span> pattern:

route:
  - source-table: inventory.product\.*
    sink-table: routing_example__$
    replace-symbol: $
    description: > 
        route all tables that begin with 'product'
        to a target prefixed with 'routing_example__'

Which from a set of MySQL tables like this:

mysql> show tables;
+---------------------+
| Tables_in_inventory |
+---------------------+
| addresses           |
| customers           |
| geom                |
| orders              |
| products            |
| products_on_hand    |
+---------------------+
6 rows in set (0.00 sec)

is streamed into these Elasticsearch indices—notice the <span class="inline-code">routing_example__</span> prefix on the two <span class="inline-code">product*</span> tables:

❯ http -b "localhost:9200/_cat/indices/?h=index"
inventory.geom
routing_example__products
inventory.customers
inventory.addresses
routing_example__products_on_hand
inventory.orders

Unlike things like schema evolution, the routing and renaming of tables is something that’s handled in a fairly straightforward way with Flink SQL; you’d change the target object name as part of the DDL. For routing multiple tables into one you have multiple <span class="inline-code">INSERT INTO</span> statements, or a single <span class="inline-code">INSERT</span> made up of a series of <span class="inline-code">UNION</span>s. It’s not as elegant or self-explanatory as declarative YAML, but it’s not the eye-wateringly painful process of setting up multiple tables into the pipeline in the first place.

One more use of route to mention is if you’re iteratively developing a pipeline and don’t want to have to clean out the target system each time you restart it. This YAML will add a suffix to each table name from the inventory database when it’s written to the sink. Each time you have a new test version of the pipeline, bump the suffix and it’ll write to a new set of tables.

route:
  - source-table: inventory.\.*
    sink-table: $-01
    replace-symbol: $
    description: Add a suffix to all tables

Transforms

As with routing, transforms kind of come as part and parcel of using Flink SQL to build a pipeline. If you’re already having to write a <span class="inline-code">CREATE TABLE</span> and <span class="inline-code">INSERT INTO</span> manually, then customising these to apply transformations is pretty standard. Say you want to create a new field that’s a composite of two others, in Flink SQL you’d do this:

-- Declare the table as before, but adding a new column
-- to hold the computed field ('new_field')
CREATE TABLE source_tbl (existing_field VARCHAR(255),
                         …,
                         …,
                         new_field VARCHAR(255),
                         PRIMARY KEY ('key_field') NOT ENFORCED)
        WITH ('connector-config' = 'goes here'
              …)

-- Run the INSERT INTO and add the expression to compute the new field
INSERT INTO target SELECT existing_field,
                          …,
                          …,
                          src_field1 || ' ' || src_field_2 AS new_field
                     FROM src_table;

An actual example of this in practice joining the <span class="inline-code">first_name</span> and <span class="inline-code">last_name</span> fields in the <span class="inline-code">customers</span> table in Flink CDC pipeline’s transform YAML is the somewhat less cumbersome and error-prone declarative instruction: 

transform:
  - source-table: inventory.customers
    projection: \*, first_name || ' ' || last_name as full_name
    description: Concatenate name fields together in customers table

Note the <span class="inline-code">\*</span> — this is adding all the existing fields to the projection (it’s a <span class="inline-code">*</span> escaped with a <span class="inline-code">\</span>). Without it the only field that would be included from <span class="inline-code">customers</span> would be the new one. Put another way, projection is basically whatever you’d write in your <span class="inline-code">SELECT</span> (which is indeed what a <span class="inline-code">SELECT</span> is—the projection clause of a SQL statement).

If you go and look in the Flink Web UI you’ll see an additional Transform operator has been added:

As well as adding fields with a transformation, you can remove them too, by omitting them from the projection list. For example, if a <span class="inline-code">customer_details</span> source table has the fields<span class="inline-code">account_id, update_ts, social_security_num</span> and you want to omit the personally identifiable information (PII), you could do this in the pipeline with:

transform:
  - source-table: inventory.customer_details
    projection: >
      account_id,
      update_ts
    description: Don’t PII in the pipeline

The similar principle would apply to doing this in Flink SQL.

Unfortunately Flink SQL nor CDC pipelines support an exclusion parameter for projections in the way that some engines including Snowflake and DuckDB do which means that you can’t do something like <span class="inline-code">SELECT * EXCLUDE social_security_num FROM customer_details</span>. This therefore makes the pipeline a bit more brittle than is ideal since you’d need to modify it every time a new non-PII column was added to the schema.

You can also use transformations to lift metadata from the pipeline rows into the target table:

transform:
  - source-table: inventory.\.*
    projection: >
      \*,
      __table_name__,  __namespace_name__,  __schema_name__,  __data_event_type__
    description: Add metadata to all tables from inventory db

Note that combining transforms can get a bit problematic if they’re both modifying the projection for the same table. Check the Flink logs if in doubt for errors.

Some notes about running Flink CDC Pipelines

As I’ll discuss later in this post, there are still some gaps in Flink CDC. One of those is the ops side of things. Running a pipeline is simple enough:

$ ./bin/flink-cdc.sh mysql-to-es.yaml

But then what? If the pipeline fails to compile, you’ll get an error back to the console. But assuming it compiles, it still might not work. From this point on, it’s simply a Flink job to inspect and manage as any other. You’ve got the built-in Flink Web UI which is also useful, as well as CLI and REST API options:

$ ./bin/flink list -r

------------------ Running/Restarting Jobs -------------------
05.12.2024 22:14:44 : ed609d9e2af52260ba7cc075ab6480c1 : Sync MySQL inventory tables to Elasticsearch (RUNNING)
--------------------------------------------------------------

$ http GET "http://localhost:8081/jobs/overview" | jq '.'
{
  "jobs": [
    {
      "jid": "70eb85b2554ac2c9e31892c85bbd3687",
      "name": "Sync MySQL inventory tables to Elasticsearch",
      "start-time": 1733441346671,
      "end-time": 1733441388219,
      "duration": 41548,
      "state": "RUNNING",
[…]

Using these you can cancel a running job by passing its id (which you’ll also get from Flink CDC when it launches the pipeline):

$ ./bin/flink cancel ed609d9e2af52260ba7cc075ab6480c1
Cancelling job ed609d9e2af52260ba7cc075ab6480c1.
Cancelled job ed609d9e2af52260ba7cc075ab6480c1.

You can also cancel all running jobs on the cluster, which if you’re running on a sandbox instance and randomly jiggling things until they unbreak can be a bit of a timesaver:

# use httpie and jq to kill all running/restarting jobs on the cluster
http GET "http://localhost:8081/jobs/overview" | \
  jq -r '.jobs[] | select(.state=="RUNNING" or .state=="RESTARTING") | .jid' | \
  while read jid; do
    http PATCH "http://localhost:8081/jobs/$jid"
  done

If you look closely at the above logic, it’s not only <span class="inline-code">RUNNING</span> jobs that are matched, but also <span class="inline-code">RESTARTING</span>. One of the things that I noticed a lot was that jobs might be logically invalid (e.g. specifying a column that doesn’t exist) but don’t abort and instead simply go into a seemingly infinite <span class="inline-code">RESTARTING/RUNNING</span> loop. The problem with this is that it’s not always apparent that there’s even a problem, particularly if you happen to look at the job status when it’s <span class="inline-code">RUNNING< and before it’s failed. Add a route so that you don’t have to clean up the target each time when you’re iteratively testing.

This sounds great. What’s the catch?

It’s still pretty early days with Flink CDC, and 3.3 is being worked on as I type, so it’ll be good to see how it shapes up.

Put bluntly, Flink CDC pipelines show a ton of potential. Is it a cool idea? Yup. Does the world need more YAML? Of course! Does it open up stream processing and integration to non-Java coders more than Flink SQL does? Definitely. Is it ready for much more than just experimentation? Well…

If your source database is MySQL, and a pipeline connector exists for your target technology, then sure, it’s worth keeping an eye on. If open source development is your thing then jump right in and get involved, because that’s what it needs. A bit more polish, and a lot more pipeline connectors.

Again, I want to be clear—this is a relatively young project. Apache Flink itself has been around for TEN years, whilst Flink CDC much less than that. As a huge fan of the open source world, I will be completely cheering for Flink CDC to grow and succeed. As a data engineer looking around at tools to get my job done, I am still cautious until it’s proved itself further. 

From that angle, of a data engineer evaluating a tool, one of the key areas that it really needs to develop in is connectors. It’s missing many of the common sinks such as Snowflake, Apache Iceberg, RDBMS, etc. On the source side I’d hope that the existing set of Flink CDC source connectors soon make it into the pipeline connector world too. Whilst you could work around a missing sink connector by using the Kafka sink pipeline connector and then taking the data from Kafka to the desired sink using Flink SQL (or Kafka Connect), this really defeats the point of a single tool to build a pipeline declaratively. It helps a little bit, but you’re still where you are today—piecing together bits of SQL and config manually into a bespoke pipeline.

Other things that will help it grow its adoption include:

  • Better validation of input, and cleaner errors when things don’t go right
  • Support for stateful pipelines—at the moment you can’t join or aggregate.
  • Clearer and more accurate documentation, both for specifics and also the project overall. It took me writing this post to really get my head around the real power of Flink CDC pipelines and what they actually are (and aren’t).
  • The ops side of things seems a bit rudimentary. After compilation it’s “fire & forget”, leaving you to poke around the standard Flink job management interfaces. This is perhaps “icing on the cake”, but if it’s simplifying building jobs, why not simplify running them too?
  • Support for Debezium 3.0 and its richer functionality including better snapshotting, improved performance, and richer support for things like Postgres replication slots on read replicas.

Ultimately, Flink CDC brings a really rich set of functionality, so let’s hope it can really develop and thrive as part of the Apache Flink ecosystem.


If you’d like to try Flink CDC out for yourself, you can find a Docker Compose file and full details on GitHub.

📫 Email signup 👇

Did you enjoy this issue of Checkpoint Chronicle? Would you like the next edition delivered directly to your email to read from the comfort of your own home?

Simply enter your email address here and we'll send you the next issue as soon as it's published—and nothing else, we promise!

Robin Moffatt

Robin is a Principal DevEx Engineer at Decodable. He has been speaking at conferences since 2009 including QCon, Devoxx, Strata, Kafka Summit, and Øredev. You can find many of his talks online and his articles on the Decodable blog as well as his own blog.

Outside of work, Robin enjoys running, drinking good beer, and eating fried breakfasts—although generally not at the same time.