Back
June 13, 2023
6
min read

How to: Flink Upsert with the Kafka SQL Connector

We are excited to share the inaugural episode of Data Streaming Quick Tips here at Decodable. Kicking things off is our own Gunnar Morling, who has created the first of many in-depth demo videos. The goal of this series is to explain one common task or challenge in the space of stream processing, for example joining data from multiple sources, running time-windowed aggregations, establishing data configurations, and much more. The series will cover upstream open source technologies like Apache Flink, Apache Kafka, Flink SQL, Debezium, as well as Decodable’s approach to stream processing. The episodes will be short and crisp, around 5 to 10 minutes.

Apache Flink Kafka Upsert Connector video

Our first topic is the Upsert Kafka SQL connector for Apache Flink. This article summarizes the video and covers what the Kafka Upsert connector is, how it compares to the regular Kafka connector, and when to use which.

Kafka Upsert Connector

At its core, the Kafka Upsert connector is able to consume changelog streams, where each data record represents an update or delete event. For example, streams produced by the Flink CDC connectors based on Debezium. The Kafka Upsert connector is able to interpret such a stream and apply those changes in an upsert fashion, meaning that only the latest state per key will be emitted to a sink/destination Kafka topic.

If you have an insert event coming in produced by Debezium, then the current state of the insert will be propagated. If an update event comes in, then the after state of this update event will be propagated. And if a delete event comes in, then a tombstone event will be emitted to Kafka, which means it can be used for compaction. This behavior is in contrast to the regular Kafka connector, which emits events in append-only fashion without considering the keys.

Preparing a Demo Environment

To make it easier for you to experiment with these connectors yourself and understand them better, we have created a GitHub repository that you can use to explore how it works. The repo shows how to use Flink's Apache Kafka SQL Connector and the Upsert Kafka SQL Connector together with the Postgres CDC connector for Apache Flink (based on Debezium), with Redpanda as a data streaming platform.

The repo’s readme explains everything you need to dive in:

  • The prerequisites needed on your demo system
  • How to start up all the components using Docker Compose
  • How to ingest data from Postgres
  • How to emit data to Kafka using the Upsert connector
  • How to emit change events when using the regular Kafka connector

Setting Up a Flink CDC Connector

In order to ingest changes into Flink, you first need to set up an instance of the Flink CDC connector. In the example repository, you can find the Flink SQL code for defining a table, along with its schema and primary key, as well as the Postgres-CDC connector information.

Once the table and its connector are set up, we can take a look at the data within Flink and see that it is exactly the same as what we see in Postgres. We can also see that an insert into Postgres shows in Flink as an insert event, while an update into Postgres shows in Flink as two events: the first represents the old state of the row and the second represents the new state. A delete in Postgres simply shows as a delete event in Flink.

Propagate a Stream Using Upsert from Flink into Redpanda

We can now propagate this changelog stream from Flink into Redpanda using the Flink Upsert Kafka SQL connector. We set up another connector using the Flink SQL shell to execute a create table statement, specifying the same schema as before. We’ll use the kafka-upsert connector to connect to the local Redpanda cluster. Note that we are using JSON as the format for the data keys and values.

Flink SQL create table

This deploys a Flink job on our cluster which runs continuously. Using the Redpanda shell, we can consume from the destination topic to see the latest state per row. When inserting a new row into Postgres, we see it show up in the Redpanda topic. When updating a row in Postgres, we see an event with the latest state for that particular row in Redpanda, so it is applying upsert semantics. When deleting a row in Postgres, Redpanda will emit a tombstone event which allows the removal of this record for compacted topics.

Differences When Using the Regular Kafka Connector

You might be wondering if  we can propagate the changelog stream from Flink into Redpanda using the regular Kafka connector? Let’s create another Flink table but this time let’s use the regular Kafka connector. We’ll specify the same schema as before and connect to our local Redpanda cluster. We once again use JSON as the format for the data keys and values.

However, when we attempt to connect the source Postgres table to a new destination Redpanda table via Flink, it fails because this Kafka connector doesn't know how to handle those update and delete events from the changelog stream. We can propagate the changelog stream from Flink into Redpanda using the regular Kafka connector if we also use the Debezium-JSON format.

Create another Flink table, but now use Debezium-JSON as the format for the data keys and values. This is natively supported by the Kafka connector and by Flink CDC in general. It is now possible to emit and propagate events in the Debezium change event format. Using the Redpanda shell, we can consume from the destination topic to see the latest state per row. The change events in Redpanda look like Debezium change events. However, it's not really Debezium, but synthetic events. This becomes obvious if you do an update. Instead of an update event which has both the old and the new state of a row, you actually get two events: a delete event which has the old state of the row and a create event with the new state of the row.

Flink SQL create table

Additional Resources

📫 Email signup 👇

Did you enjoy this issue of Checkpoint Chronicle? Would you like the next edition delivered directly to your email to read from the comfort of your own home?

Simply enter your email address here and we'll send you the next issue as soon as it's published—and nothing else, we promise!

👍 Got it!
Oops! Something went wrong while submitting the form.
David Fabritius

We are excited to share the inaugural episode of Data Streaming Quick Tips here at Decodable. Kicking things off is our own Gunnar Morling, who has created the first of many in-depth demo videos. The goal of this series is to explain one common task or challenge in the space of stream processing, for example joining data from multiple sources, running time-windowed aggregations, establishing data configurations, and much more. The series will cover upstream open source technologies like Apache Flink, Apache Kafka, Flink SQL, Debezium, as well as Decodable’s approach to stream processing. The episodes will be short and crisp, around 5 to 10 minutes.

Apache Flink Kafka Upsert Connector video

Our first topic is the Upsert Kafka SQL connector for Apache Flink. This article summarizes the video and covers what the Kafka Upsert connector is, how it compares to the regular Kafka connector, and when to use which.

Kafka Upsert Connector

At its core, the Kafka Upsert connector is able to consume changelog streams, where each data record represents an update or delete event. For example, streams produced by the Flink CDC connectors based on Debezium. The Kafka Upsert connector is able to interpret such a stream and apply those changes in an upsert fashion, meaning that only the latest state per key will be emitted to a sink/destination Kafka topic.

If you have an insert event coming in produced by Debezium, then the current state of the insert will be propagated. If an update event comes in, then the after state of this update event will be propagated. And if a delete event comes in, then a tombstone event will be emitted to Kafka, which means it can be used for compaction. This behavior is in contrast to the regular Kafka connector, which emits events in append-only fashion without considering the keys.

Preparing a Demo Environment

To make it easier for you to experiment with these connectors yourself and understand them better, we have created a GitHub repository that you can use to explore how it works. The repo shows how to use Flink's Apache Kafka SQL Connector and the Upsert Kafka SQL Connector together with the Postgres CDC connector for Apache Flink (based on Debezium), with Redpanda as a data streaming platform.

The repo’s readme explains everything you need to dive in:

  • The prerequisites needed on your demo system
  • How to start up all the components using Docker Compose
  • How to ingest data from Postgres
  • How to emit data to Kafka using the Upsert connector
  • How to emit change events when using the regular Kafka connector

Setting Up a Flink CDC Connector

In order to ingest changes into Flink, you first need to set up an instance of the Flink CDC connector. In the example repository, you can find the Flink SQL code for defining a table, along with its schema and primary key, as well as the Postgres-CDC connector information.

Once the table and its connector are set up, we can take a look at the data within Flink and see that it is exactly the same as what we see in Postgres. We can also see that an insert into Postgres shows in Flink as an insert event, while an update into Postgres shows in Flink as two events: the first represents the old state of the row and the second represents the new state. A delete in Postgres simply shows as a delete event in Flink.

Propagate a Stream Using Upsert from Flink into Redpanda

We can now propagate this changelog stream from Flink into Redpanda using the Flink Upsert Kafka SQL connector. We set up another connector using the Flink SQL shell to execute a create table statement, specifying the same schema as before. We’ll use the kafka-upsert connector to connect to the local Redpanda cluster. Note that we are using JSON as the format for the data keys and values.

Flink SQL create table

This deploys a Flink job on our cluster which runs continuously. Using the Redpanda shell, we can consume from the destination topic to see the latest state per row. When inserting a new row into Postgres, we see it show up in the Redpanda topic. When updating a row in Postgres, we see an event with the latest state for that particular row in Redpanda, so it is applying upsert semantics. When deleting a row in Postgres, Redpanda will emit a tombstone event which allows the removal of this record for compacted topics.

Differences When Using the Regular Kafka Connector

You might be wondering if  we can propagate the changelog stream from Flink into Redpanda using the regular Kafka connector? Let’s create another Flink table but this time let’s use the regular Kafka connector. We’ll specify the same schema as before and connect to our local Redpanda cluster. We once again use JSON as the format for the data keys and values.

However, when we attempt to connect the source Postgres table to a new destination Redpanda table via Flink, it fails because this Kafka connector doesn't know how to handle those update and delete events from the changelog stream. We can propagate the changelog stream from Flink into Redpanda using the regular Kafka connector if we also use the Debezium-JSON format.

Create another Flink table, but now use Debezium-JSON as the format for the data keys and values. This is natively supported by the Kafka connector and by Flink CDC in general. It is now possible to emit and propagate events in the Debezium change event format. Using the Redpanda shell, we can consume from the destination topic to see the latest state per row. The change events in Redpanda look like Debezium change events. However, it's not really Debezium, but synthetic events. This becomes obvious if you do an update. Instead of an update event which has both the old and the new state of a row, you actually get two events: a delete event which has the old state of the row and a create event with the new state of the row.

Flink SQL create table

Additional Resources

📫 Email signup 👇

Did you enjoy this issue of Checkpoint Chronicle? Would you like the next edition delivered directly to your email to read from the comfort of your own home?

Simply enter your email address here and we'll send you the next issue as soon as it's published—and nothing else, we promise!

David Fabritius