Back
June 27, 2023
4
min read

Re-keying a Kafka Topic

We are excited to share another episode of Data Streaming Quick Tips by Gunnar Morling. The goal of this series is to explain one common task or challenge in the space of stream processing, for example joining data from multiple sources, running time-windowed aggregations, establishing data configurations, and much more. The series will cover upstream open source technologies like Apache Flink, Apache Kafka, Flink SQL, Debezium, as well as Decodable’s approach to stream processing. The episodes will be short and crisp, around 5 to 10 minutes.

Our second topic is Re-keying a Kafka Topic. This article summarizes the video and covers both how—and why—to re-key a Kafka topic, what that’s all about, and how Decodable can help you accomplish that.

Why Re-Key a Kafka Topic

At its core, re-keying a Kafka topic is simply the process of taking data from one Kafka topic, changing its key definition, and then writing the data back into another Kafka topic.

Before diving into the details of how to re-key a Kafka topic, let's briefly examine why you would want to do this in the first place. To understand things better, let's look at an example. Let's assume we have a Kafka topic with records that represent geometric objects with properties like color and shape. If you put those items onto a Kafka topic, there might be records like a green diamond, a blue square, a purple circle, and so on.

One of the defining features of Kafka is its scalability, which means the data from a single Kafka topic can be spread out across multiple brokers within a Kafka cluster. To see how this works, let's take a look under the hood. Each Kafka topic is organized into partitions which each take a share of the data. And this is where the notion of the Kafka record key comes into the picture—it determines which partition a specific record should be sent.

In our example, the object’s shape is used as the partition key. The Kafka partitioner calculates a hash of the key value which then determines the partition where the record is stored. For example, all objects with a square or circle shape are sent to partition 0, all triangular or hexagonal objects go to partition 1, and all the diamond objects go to Partition 2.

Distributing the data across multiple partitions allows you to scale out your load to multiple broker nodes in a cluster, but it also comes with some important semantic implications for your Kafka consumers. Those consumers are typically organized in groups with each group member subscribing to a subset of the available partitions. In our example, consumer 1 is subscribed to partitions 0 and 1, and consumer 2 is subscribed to partition 2. This means that each consumer will receive the records with the same key in the exact same order as they have been produced. It also means that all records with the same key will be processed by the same consumer.

But what should we do if we want to make sure that all the orange or all the purple objects are processed by the same consumer? As the topic is partitioned by shape, the records with the same color are distributed across all the partitions, and thus no ordering per color is guaranteed. This means we need to re-key our topic. In other words, modify the key for each record and write that data back into another topic.

The question now is, how do you implement such a re-keying process? We could implement a bespoke microservice using the Kafka consumer and producer APIs, but then we would be faced with the operational overhead of running the service. Similarly, we could use a stream processing API such as Kafka streams or the Flink Java API to implement the rekeying logic. But again, we would have to operate this service or a Flink cluster. So instead, let's take a look at how to re-key a Kafka topic using Decodable.

Kafka Source Connector

In order to make things a bit more tangible, let’s look at a small example application which uses the Kafka producer API for persisting records with purchase orders. Here we’ll use the order ID as the record key, and then the value is just some random data. Taking a look into the corresponding Kafka topic, we can see the data, the Kafka record key, the value, and the partition and offsets.

While the data is keyed by order ID at this point, let's assume we want to make sure that all the purchase orders for a given product ID are processed by the same consumer. For instance, to make sure that there is only a single writer which updates inventory counts. In order to achieve this, we must re-key the Kafka topic.

To quickly and easily do this, let’s go to Decodable Web and create a connection to our purchase orders topic using the Kafka Source connector. When setting up the connector, we specify all the information like the source URL, the topic name, security details, and credentials. The data is sent to a stream in Decodable. The stream’s schema must match the schema of the data in the Kafka topic, so make sure that you’ve specified all of the field names and their data types that are present in the purchase order records.   Once created, let’s go ahead and start the connection.

Kafka Sink Connector

So having set up the source connector, now it's time to set up a Kafka sink connector. This will take the data from the Decodable stream, create a new record key, and then write the data back into another Kafka topic using this new key. As an alternative to using Decodable Web, we can also use the Decodable Command Line Interface (Decodable CLI). The Decodable CLI provides commands like connection list or stream list which show the set of existing connections or streams. We can set up a new connection and specify all the relevant information, like the schema of this connection, the bootstrap server, security details, credentials and so on.

And very importantly for our goal today, we specify that the product ID should be used as the sole key field. While it’s possible to have multiple fields in a Kafka record key, in this case we’re just using one. This is exactly the core part where we re-key the Kafka record.

After creating and starting the new connection with a different record key, there are now two: a source as well as a sink connection.

Data Flow

At this point, both connections are running so we can take a look at the entire data flow in Decodable Web. Start at the source connection, and from there, navigate to the associated stream. We can run a stream preview to see that new data is coming in and from there, we can navigate to the sink connection and see from the metrics that the record count is increasing. So the data is flowing from the source topic, it is being re-keyed, and then it is being sent to a new destination Kafka topic.

As a final check, we can take a look at the data in the output Kafka topic using kcat. We can see the data with product ID as the key for the record. And taking a closer look, we can also see that all the records with the same product ID are going into the same partition exactly as we intended.

Additional Resources

📫 Email signup 👇

Did you enjoy this issue of Checkpoint Chronicle? Would you like the next edition delivered directly to your email to read from the comfort of your own home?

Simply enter your email address here and we'll send you the next issue as soon as it's published—and nothing else, we promise!

👍 Got it!
Oops! Something went wrong while submitting the form.
David Fabritius

We are excited to share another episode of Data Streaming Quick Tips by Gunnar Morling. The goal of this series is to explain one common task or challenge in the space of stream processing, for example joining data from multiple sources, running time-windowed aggregations, establishing data configurations, and much more. The series will cover upstream open source technologies like Apache Flink, Apache Kafka, Flink SQL, Debezium, as well as Decodable’s approach to stream processing. The episodes will be short and crisp, around 5 to 10 minutes.

Our second topic is Re-keying a Kafka Topic. This article summarizes the video and covers both how—and why—to re-key a Kafka topic, what that’s all about, and how Decodable can help you accomplish that.

Why Re-Key a Kafka Topic

At its core, re-keying a Kafka topic is simply the process of taking data from one Kafka topic, changing its key definition, and then writing the data back into another Kafka topic.

Before diving into the details of how to re-key a Kafka topic, let's briefly examine why you would want to do this in the first place. To understand things better, let's look at an example. Let's assume we have a Kafka topic with records that represent geometric objects with properties like color and shape. If you put those items onto a Kafka topic, there might be records like a green diamond, a blue square, a purple circle, and so on.

One of the defining features of Kafka is its scalability, which means the data from a single Kafka topic can be spread out across multiple brokers within a Kafka cluster. To see how this works, let's take a look under the hood. Each Kafka topic is organized into partitions which each take a share of the data. And this is where the notion of the Kafka record key comes into the picture—it determines which partition a specific record should be sent.

In our example, the object’s shape is used as the partition key. The Kafka partitioner calculates a hash of the key value which then determines the partition where the record is stored. For example, all objects with a square or circle shape are sent to partition 0, all triangular or hexagonal objects go to partition 1, and all the diamond objects go to Partition 2.

Distributing the data across multiple partitions allows you to scale out your load to multiple broker nodes in a cluster, but it also comes with some important semantic implications for your Kafka consumers. Those consumers are typically organized in groups with each group member subscribing to a subset of the available partitions. In our example, consumer 1 is subscribed to partitions 0 and 1, and consumer 2 is subscribed to partition 2. This means that each consumer will receive the records with the same key in the exact same order as they have been produced. It also means that all records with the same key will be processed by the same consumer.

But what should we do if we want to make sure that all the orange or all the purple objects are processed by the same consumer? As the topic is partitioned by shape, the records with the same color are distributed across all the partitions, and thus no ordering per color is guaranteed. This means we need to re-key our topic. In other words, modify the key for each record and write that data back into another topic.

The question now is, how do you implement such a re-keying process? We could implement a bespoke microservice using the Kafka consumer and producer APIs, but then we would be faced with the operational overhead of running the service. Similarly, we could use a stream processing API such as Kafka streams or the Flink Java API to implement the rekeying logic. But again, we would have to operate this service or a Flink cluster. So instead, let's take a look at how to re-key a Kafka topic using Decodable.

Kafka Source Connector

In order to make things a bit more tangible, let’s look at a small example application which uses the Kafka producer API for persisting records with purchase orders. Here we’ll use the order ID as the record key, and then the value is just some random data. Taking a look into the corresponding Kafka topic, we can see the data, the Kafka record key, the value, and the partition and offsets.

While the data is keyed by order ID at this point, let's assume we want to make sure that all the purchase orders for a given product ID are processed by the same consumer. For instance, to make sure that there is only a single writer which updates inventory counts. In order to achieve this, we must re-key the Kafka topic.

To quickly and easily do this, let’s go to Decodable Web and create a connection to our purchase orders topic using the Kafka Source connector. When setting up the connector, we specify all the information like the source URL, the topic name, security details, and credentials. The data is sent to a stream in Decodable. The stream’s schema must match the schema of the data in the Kafka topic, so make sure that you’ve specified all of the field names and their data types that are present in the purchase order records.   Once created, let’s go ahead and start the connection.

Kafka Sink Connector

So having set up the source connector, now it's time to set up a Kafka sink connector. This will take the data from the Decodable stream, create a new record key, and then write the data back into another Kafka topic using this new key. As an alternative to using Decodable Web, we can also use the Decodable Command Line Interface (Decodable CLI). The Decodable CLI provides commands like connection list or stream list which show the set of existing connections or streams. We can set up a new connection and specify all the relevant information, like the schema of this connection, the bootstrap server, security details, credentials and so on.

And very importantly for our goal today, we specify that the product ID should be used as the sole key field. While it’s possible to have multiple fields in a Kafka record key, in this case we’re just using one. This is exactly the core part where we re-key the Kafka record.

After creating and starting the new connection with a different record key, there are now two: a source as well as a sink connection.

Data Flow

At this point, both connections are running so we can take a look at the entire data flow in Decodable Web. Start at the source connection, and from there, navigate to the associated stream. We can run a stream preview to see that new data is coming in and from there, we can navigate to the sink connection and see from the metrics that the record count is increasing. So the data is flowing from the source topic, it is being re-keyed, and then it is being sent to a new destination Kafka topic.

As a final check, we can take a look at the data in the output Kafka topic using kcat. We can see the data with product ID as the key for the record. And taking a closer look, we can also see that all the records with the same product ID are going into the same partition exactly as we intended.

Additional Resources

📫 Email signup 👇

Did you enjoy this issue of Checkpoint Chronicle? Would you like the next edition delivered directly to your email to read from the comfort of your own home?

Simply enter your email address here and we'll send you the next issue as soon as it's published—and nothing else, we promise!

David Fabritius