Back
June 18, 2024
5
min read

How to get data from Apache Kafka to Apache Iceberg on S3 with Decodable

By
Robin Moffatt
Share this post

Apache Iceberg is an open table format. It combines the benefits of data lakes (open standards, cheap object storage) with the good things that data warehouses have, like first-class support for tables and SQL capabilities including updates to data in place, time-travel, and transactions. With the recent acquisition by Databricks of Tabular—one of the main companies that contribute to Iceberg—it’s clear that Iceberg is winning out as one of the primary contenders in this space.

With all these benefits, who wouldn't want to use Iceberg? And so getting data into it from the ubiquitous real-time streaming tool, Apache Kafka, is a common requirement. Perhaps you've got data from your microservices that use Kafka as the message broker and you want to land that data to Iceberg for analysis and audit. Maybe you're doing data movement between systems with Kafka as the message bus. Whatever the data you've got in Kafka, streaming it in real-time to Iceberg is a piece of cake with Decodable.

Decodable is built on Apache Flink and Debezium, but the beauty of it being a fully managed platform is that you don't even need to know that. I mean, you can know that, because I've just told you. And in fact, you can bring your own Flink jobs and run them directly on Decodable. But I'm going off on a tangent here…to do data movement and processing with Decodable, you use a web interface, CLI, or API to define connections, and that's it. You can add some processing with SQL along the way if you want to, and if you do it's again just web, CLI or API. Not a jot of code to be written!

So let's get started. In this scenario we've got point of sale (PoS) data from multiple stores streaming into a Kafka topic. It is at the basket level, telling us for a particular transaction who the customer was, which store it was at, and what the customer bought. We're going to stream this to Amazon S3 in Iceberg format so that we can then build some analyses against it to answer business questions such as the number of baskets processed during the day, and volume of items sold.

Step 1: Set up a connection to Kafka

Our Kafka broker has a topic called <span class="inline-code">supermarketBaskets</span> with data in each record that looks like this:

Key:
{
    "basketId": "4728c75b-ed83-eff3-9ed9-3fb11eb83443"
}
Value:
{
    "customerId": "d540b741-d188-ecd6-601d-de1b1b509502",
    "customerName": "Ardath Jenkins",
    "customerAddress": "Suite 726 730 Jeffry Ranch, East Isrealview, AR 73579",
    "storeId": "e1196df6-bb39-ab63-1082-4deafadc57f2",
    "storeName": "Leuschke-Langworth",
    "storeLocation": "Haiside",
    "products": [
        {
            "productName": "Intelligent Plastic Wallet",
            "quantity": 4,
            "unitPrice": 13.13,
            "category": "Beauty, Home & Outdoors"
        },
        {
            "productName": "Sleek Marble Shoes",
            "quantity": 5,
            "unitPrice": 3.61,
            "category": "Clothing & Games"
        }
    ],
    "timestamp": "2024-06-05T11:21:56.736+0000"
}

Using the Decodable CLI we'll create a source connection for ingesting the messages from the Kafka topic into Decodable:

decodable connection create                                \
    --name rmoff-kafka-basket                              \
    --type source                                          \
    --connector kafka                                      \
    --prop bootstrap.servers=my_broker:9092                \
    --prop value.format=json                               \
    --prop key.fields=basketId                             \
    --prop key.format=json                                 \
    --prop parse-error-policy=FAIL                         \
    --prop properties.auto.offset.reset=none               \
    --prop scan.startup.mode=earliest-offset               \
    --prop topic=supermarketBaskets                        \
    --prop value.fields-include=EXCEPT_KEY                 \
    --field basketId="STRING"                              \
    --field customerId="STRING"                            \
    --field customerName="STRING"                          \
    --field customerAddress="STRING"                       \
    --field storeId="STRING"                               \
    --field storeName="STRING"                             \
    --field storeLocation="STRING"                         \
    --field products="ARRAY"  \
    --field timestamp="STRING"

From this we get the id of the connection that’s been created:

Created connection rmoff-kafka-basket (97f8ace4)

With the connection defined we can now start it:

decodable connection activate 97f8ace4

At this point let's hop over to the Decodable web UI and look at the data being brought in.

CleanShot 2024-06-06 at 12.35.03@2x.png

Step 2: Set up a connection to Iceberg

I've already created an S3 bucket, Glue catalog, and done the necessary IAM configuration. Now I just need to tell Decodable to send the data that we're ingesting from Kafka over to Iceberg by creating a sink connection:

$ decodable connection create                                                      \
    --name rmoff-basket-iceberg                                                    \
    --type sink                                                                    \
    --connector iceberg                                                            \
    --prop catalog-database=rmoff                                                  \
    --prop catalog-table=rmoff_basket                                              \
    --prop catalog-type=glue                                                       \
    --prop format=parquet                                                          \
    --prop region=us-west-2                                                        \
    --prop role-arn=arn:aws:iam::xxxxx:role/rmoff-decodable-s3                     \
    --prop warehouse=s3://rmoff/iceberg-test/                                      \
    --stream-id $(decodable query --keep-ids --name                                \
                  $(decodable query --name rmoff-kafka-basket |                    \
                    yq '.spec.stream_name') |                                      \
                    yq '.metadata.id')                                             \
    --field basketId="STRING"                                                      \
    --field customerId="STRING"                                                    \
    --field customerName="STRING"                                                  \
    --field customerAddress="STRING"                                               \
    --field storeId="STRING"                                                       \
    --field storeName="STRING"                                                     \
    --field storeLocation="STRING"                                                 \
    --field products="ARRAY"  \
    --field timestamp="STRING"

One thing to point out here is that the stream id is dynamically derived. You can also hard code it based on the id shown in the web UI (as shown above when previewing the data).

Created connection rmoff-basket-iceberg (003247ff)

As before, once created, we need to start the connection. Because we want to send all of the data that we've read (and are reading) from Kafka to Iceberg, we'll use <span class="inline-code">--start-position earliest</span>:

decodable connection activate 003247ff --start-position earliest

Over in the Decodable web UI we can see that the connection is running, and metrics about the records processed

CleanShot 2024-06-06 at 13.35.04.png

Let's check that the data is indeed being written:

$ aws s3 ls s3://rmoff/iceberg-test/rmoff.db/rmoff_basket02/
                           PRE data/
                           PRE metadata/
$ aws s3 ls s3://rmoff/iceberg-test/rmoff.db/rmoff_basket02/data/
2024-06-05 18:07:22      30440 00000-0-dd5fc5f4-9821-448a-8bf6-b3b0a4e3d267-00001.parquet
[…]

Using the Iceberg data

DuckDB

The wonderful thing about open formats is the proliferation of support from multiple technologies that they often prompt. Iceberg has swept through the data ecosystem, with support from distributed compute such as Flink and Spark, and query engines including Presto, and DuckDB.

Using DuckDB we can quickly do a row count check to make sure we're seeing what we'd expect:

SELECT COUNT(*)
FROM iceberg_scan('s3://dw/iceberg-test/rmoff.db/rmoff_basket02/metadata/00640-ed419044-046c-44c0-a20a-e51f0cce381f.metadata.json', skip_schema_inference=True);
┌──────────────┐
│ count_star() │
│    int64     │
├──────────────┤
│        20100 │
└──────────────┘
Run Time (s): real 19.045 user 2.377048 sys 9.469687

We can also sample the data:

SELECT *
FROM iceberg_scan('s3://dw/iceberg-test/rmoff.db/rmoff_basket02/metadata/00640-ed419044-046c-44c0-a20a-e51f0cce381f.metadata.json', skip_schema_inference=True)
LIMIT 1;

┌──────────────────────┬──────────────────────┬──────────────┬──────────────────────┬──────────────────────
│       basketId       │      customerId      │ customerName │   customerAddress    │       storeId
│       varchar        │       varchar        │   varchar    │       varchar        │       varchar
├──────────────────────┼──────────────────────┼──────────────┼──────────────────────┼──────────────────────
│ 4629c66e-7b28-2d3f…  │ 0e65ac50-0c12-0cbd…  │ Sarita Hane  │ Suite 069 65438 Wa…  │ 4238f9ef-0c08-a4c3…
└──────────────────────┴──────────────────────┴──────────────┴──────────────────────┴──────────────────────

Amazon Athena and Quicksight

There are a variety of tools available for working with Iceberg data to build queries. With the data in S3 and wanting a web UI (rather than CLI like DuckDB above) I reached for the obvious tool—Amazon Athena (which is built on PrestoDB).

Since I'm using the Glue catalog it's easy to find the table that I've loaded:

CleanShot 2024-06-05 at 19.02.08.png

From here I can write a SQL query in Athena to look at the data and expand out the nested <span class="inline-code">products</span> field using <span class="inline-code">CROSS JOIN</span>:

CleanShot 2024-06-05 at 19.01.45.png

Using this query, I then put together a simple dashboard in Amazon Quicksight (again; easiest tool to hand since I had the AWS console already open…). This uses a slightly modified version of the SQL shown above to split out the date and time components:

SELECT 
  basketid,
  customername,
  storename,
  "timestamp"  AS basket_ts,
  cast(from_iso8601_timestamp("timestamp") as date) as basket_date,
  cast(from_iso8601_timestamp("timestamp") as time) as basket_time,
  p.productName,
  p.quantity,
  p.unitPrice,
  p.category
FROM 
  "rmoff"."rmoff_basket02" 
CROSS JOIN 
  UNNEST(products) AS t(p);

Using this, I can plot the number of baskets (transactions) and products sold over time at different granularities, as well as a breakdown of unit sales by category.

CleanShot 2024-06-05 at 19.39.19.png

The brilliant thing here is that this is not a batch process, in which I need to rerun a job to see the latest data. As new sales are made in the stores, the data will flow into a Kafka topic and through to Iceberg, ready to query and drive my dashboard.

Learn more

Ready to dive deeper into Flink and looking to avoid some of the more common technical pitfalls? Check out the Top 5 Mistakes When Deploying Apache Flink.

Resources

📫 Email signup 👇

Did you enjoy this issue of Checkpoint Chronicle? Would you like the next edition delivered directly to your email to read from the comfort of your own home?

Simply enter your email address here and we'll send you the next issue as soon as it's published—and nothing else, we promise!

👍 Got it!
Oops! Something went wrong while submitting the form.
Robin Moffatt

Robin is a Principal DevEx Engineer at Decodable. He has been speaking at conferences since 2009 including QCon, Devoxx, Strata, Kafka Summit, and Øredev. You can find many of his talks online and his articles on the Decodable blog as well as his own blog.

Outside of work, Robin enjoys running, drinking good beer, and eating fried breakfasts—although generally not at the same time.

Apache Iceberg is an open table format. It combines the benefits of data lakes (open standards, cheap object storage) with the good things that data warehouses have, like first-class support for tables and SQL capabilities including updates to data in place, time-travel, and transactions. With the recent acquisition by Databricks of Tabular—one of the main companies that contribute to Iceberg—it’s clear that Iceberg is winning out as one of the primary contenders in this space.

With all these benefits, who wouldn't want to use Iceberg? And so getting data into it from the ubiquitous real-time streaming tool, Apache Kafka, is a common requirement. Perhaps you've got data from your microservices that use Kafka as the message broker and you want to land that data to Iceberg for analysis and audit. Maybe you're doing data movement between systems with Kafka as the message bus. Whatever the data you've got in Kafka, streaming it in real-time to Iceberg is a piece of cake with Decodable.

Decodable is built on Apache Flink and Debezium, but the beauty of it being a fully managed platform is that you don't even need to know that. I mean, you can know that, because I've just told you. And in fact, you can bring your own Flink jobs and run them directly on Decodable. But I'm going off on a tangent here…to do data movement and processing with Decodable, you use a web interface, CLI, or API to define connections, and that's it. You can add some processing with SQL along the way if you want to, and if you do it's again just web, CLI or API. Not a jot of code to be written!

So let's get started. In this scenario we've got point of sale (PoS) data from multiple stores streaming into a Kafka topic. It is at the basket level, telling us for a particular transaction who the customer was, which store it was at, and what the customer bought. We're going to stream this to Amazon S3 in Iceberg format so that we can then build some analyses against it to answer business questions such as the number of baskets processed during the day, and volume of items sold.

Step 1: Set up a connection to Kafka

Our Kafka broker has a topic called <span class="inline-code">supermarketBaskets</span> with data in each record that looks like this:

Key:
{
    "basketId": "4728c75b-ed83-eff3-9ed9-3fb11eb83443"
}
Value:
{
    "customerId": "d540b741-d188-ecd6-601d-de1b1b509502",
    "customerName": "Ardath Jenkins",
    "customerAddress": "Suite 726 730 Jeffry Ranch, East Isrealview, AR 73579",
    "storeId": "e1196df6-bb39-ab63-1082-4deafadc57f2",
    "storeName": "Leuschke-Langworth",
    "storeLocation": "Haiside",
    "products": [
        {
            "productName": "Intelligent Plastic Wallet",
            "quantity": 4,
            "unitPrice": 13.13,
            "category": "Beauty, Home & Outdoors"
        },
        {
            "productName": "Sleek Marble Shoes",
            "quantity": 5,
            "unitPrice": 3.61,
            "category": "Clothing & Games"
        }
    ],
    "timestamp": "2024-06-05T11:21:56.736+0000"
}

Using the Decodable CLI we'll create a source connection for ingesting the messages from the Kafka topic into Decodable:

decodable connection create                                \
    --name rmoff-kafka-basket                              \
    --type source                                          \
    --connector kafka                                      \
    --prop bootstrap.servers=my_broker:9092                \
    --prop value.format=json                               \
    --prop key.fields=basketId                             \
    --prop key.format=json                                 \
    --prop parse-error-policy=FAIL                         \
    --prop properties.auto.offset.reset=none               \
    --prop scan.startup.mode=earliest-offset               \
    --prop topic=supermarketBaskets                        \
    --prop value.fields-include=EXCEPT_KEY                 \
    --field basketId="STRING"                              \
    --field customerId="STRING"                            \
    --field customerName="STRING"                          \
    --field customerAddress="STRING"                       \
    --field storeId="STRING"                               \
    --field storeName="STRING"                             \
    --field storeLocation="STRING"                         \
    --field products="ARRAY"  \
    --field timestamp="STRING"

From this we get the id of the connection that’s been created:

Created connection rmoff-kafka-basket (97f8ace4)

With the connection defined we can now start it:

decodable connection activate 97f8ace4

At this point let's hop over to the Decodable web UI and look at the data being brought in.

CleanShot 2024-06-06 at 12.35.03@2x.png

Step 2: Set up a connection to Iceberg

I've already created an S3 bucket, Glue catalog, and done the necessary IAM configuration. Now I just need to tell Decodable to send the data that we're ingesting from Kafka over to Iceberg by creating a sink connection:

$ decodable connection create                                                      \
    --name rmoff-basket-iceberg                                                    \
    --type sink                                                                    \
    --connector iceberg                                                            \
    --prop catalog-database=rmoff                                                  \
    --prop catalog-table=rmoff_basket                                              \
    --prop catalog-type=glue                                                       \
    --prop format=parquet                                                          \
    --prop region=us-west-2                                                        \
    --prop role-arn=arn:aws:iam::xxxxx:role/rmoff-decodable-s3                     \
    --prop warehouse=s3://rmoff/iceberg-test/                                      \
    --stream-id $(decodable query --keep-ids --name                                \
                  $(decodable query --name rmoff-kafka-basket |                    \
                    yq '.spec.stream_name') |                                      \
                    yq '.metadata.id')                                             \
    --field basketId="STRING"                                                      \
    --field customerId="STRING"                                                    \
    --field customerName="STRING"                                                  \
    --field customerAddress="STRING"                                               \
    --field storeId="STRING"                                                       \
    --field storeName="STRING"                                                     \
    --field storeLocation="STRING"                                                 \
    --field products="ARRAY"  \
    --field timestamp="STRING"

One thing to point out here is that the stream id is dynamically derived. You can also hard code it based on the id shown in the web UI (as shown above when previewing the data).

Created connection rmoff-basket-iceberg (003247ff)

As before, once created, we need to start the connection. Because we want to send all of the data that we've read (and are reading) from Kafka to Iceberg, we'll use <span class="inline-code">--start-position earliest</span>:

decodable connection activate 003247ff --start-position earliest

Over in the Decodable web UI we can see that the connection is running, and metrics about the records processed

CleanShot 2024-06-06 at 13.35.04.png

Let's check that the data is indeed being written:

$ aws s3 ls s3://rmoff/iceberg-test/rmoff.db/rmoff_basket02/
                           PRE data/
                           PRE metadata/
$ aws s3 ls s3://rmoff/iceberg-test/rmoff.db/rmoff_basket02/data/
2024-06-05 18:07:22      30440 00000-0-dd5fc5f4-9821-448a-8bf6-b3b0a4e3d267-00001.parquet
[…]

Using the Iceberg data

DuckDB

The wonderful thing about open formats is the proliferation of support from multiple technologies that they often prompt. Iceberg has swept through the data ecosystem, with support from distributed compute such as Flink and Spark, and query engines including Presto, and DuckDB.

Using DuckDB we can quickly do a row count check to make sure we're seeing what we'd expect:

SELECT COUNT(*)
FROM iceberg_scan('s3://dw/iceberg-test/rmoff.db/rmoff_basket02/metadata/00640-ed419044-046c-44c0-a20a-e51f0cce381f.metadata.json', skip_schema_inference=True);
┌──────────────┐
│ count_star() │
│    int64     │
├──────────────┤
│        20100 │
└──────────────┘
Run Time (s): real 19.045 user 2.377048 sys 9.469687

We can also sample the data:

SELECT *
FROM iceberg_scan('s3://dw/iceberg-test/rmoff.db/rmoff_basket02/metadata/00640-ed419044-046c-44c0-a20a-e51f0cce381f.metadata.json', skip_schema_inference=True)
LIMIT 1;

┌──────────────────────┬──────────────────────┬──────────────┬──────────────────────┬──────────────────────
│       basketId       │      customerId      │ customerName │   customerAddress    │       storeId
│       varchar        │       varchar        │   varchar    │       varchar        │       varchar
├──────────────────────┼──────────────────────┼──────────────┼──────────────────────┼──────────────────────
│ 4629c66e-7b28-2d3f…  │ 0e65ac50-0c12-0cbd…  │ Sarita Hane  │ Suite 069 65438 Wa…  │ 4238f9ef-0c08-a4c3…
└──────────────────────┴──────────────────────┴──────────────┴──────────────────────┴──────────────────────

Amazon Athena and Quicksight

There are a variety of tools available for working with Iceberg data to build queries. With the data in S3 and wanting a web UI (rather than CLI like DuckDB above) I reached for the obvious tool—Amazon Athena (which is built on PrestoDB).

Since I'm using the Glue catalog it's easy to find the table that I've loaded:

CleanShot 2024-06-05 at 19.02.08.png

From here I can write a SQL query in Athena to look at the data and expand out the nested <span class="inline-code">products</span> field using <span class="inline-code">CROSS JOIN</span>:

CleanShot 2024-06-05 at 19.01.45.png

Using this query, I then put together a simple dashboard in Amazon Quicksight (again; easiest tool to hand since I had the AWS console already open…). This uses a slightly modified version of the SQL shown above to split out the date and time components:

SELECT 
  basketid,
  customername,
  storename,
  "timestamp"  AS basket_ts,
  cast(from_iso8601_timestamp("timestamp") as date) as basket_date,
  cast(from_iso8601_timestamp("timestamp") as time) as basket_time,
  p.productName,
  p.quantity,
  p.unitPrice,
  p.category
FROM 
  "rmoff"."rmoff_basket02" 
CROSS JOIN 
  UNNEST(products) AS t(p);

Using this, I can plot the number of baskets (transactions) and products sold over time at different granularities, as well as a breakdown of unit sales by category.

CleanShot 2024-06-05 at 19.39.19.png

The brilliant thing here is that this is not a batch process, in which I need to rerun a job to see the latest data. As new sales are made in the stores, the data will flow into a Kafka topic and through to Iceberg, ready to query and drive my dashboard.

Learn more

Ready to dive deeper into Flink and looking to avoid some of the more common technical pitfalls? Check out the Top 5 Mistakes When Deploying Apache Flink.

Resources

📫 Email signup 👇

Did you enjoy this issue of Checkpoint Chronicle? Would you like the next edition delivered directly to your email to read from the comfort of your own home?

Simply enter your email address here and we'll send you the next issue as soon as it's published—and nothing else, we promise!

Robin Moffatt

Robin is a Principal DevEx Engineer at Decodable. He has been speaking at conferences since 2009 including QCon, Devoxx, Strata, Kafka Summit, and Øredev. You can find many of his talks online and his articles on the Decodable blog as well as his own blog.

Outside of work, Robin enjoys running, drinking good beer, and eating fried breakfasts—although generally not at the same time.