Sometimes it's not possible to have too much of a good thing, and whilst this blog may look at first-glance rather similar to the one that I published just recently, today we're looking at a 100% pure Apache solution. Because who knows, maybe you prefer rolling your own tech stacks instead of letting Decodable do it for you ๐.
Apache Iceberg is an open-table format (OTF) which along with Delta Lake has been gaining a huge amount of traction in recent months. Supported by most of the big platformsโincluding now, notably, Databricks with their acquisition of Tabularโit's proving popular in "Data Lakehouse" implementations. This splices the concept of data lake (chuck it all in an object store and worry about it later) with that of a data warehouse (y'know, a bit of data modeling and management wouldn't be entirely the worst idea ever). Apache Hudi and Apache Paimon are also part of this cohort, although with less apparent traction so far.
Let's look at implementing a very common requirement: taking a stream of data in Kafka and writing it to Iceberg. Depending on your pipeline infrastructure of choice you might decide to do this using Kafka Connect, or as is the case commonly these days, Apache Flinkโand that's what we're going to look at here. Using Flink SQL for streaming data to Iceberg also gives us the advantage of being able to do some transformation to the data directly with SQL.
tl;dr Create a Source; Create a Sink
Let's leave the messy and boring dependency prerequisites for later, and jump straight to the action. For now, all you need to know for getting data from Kafka to Iceberg with Flink SQL is that you need to do the following:ย
- Create a Kafka source table
- Create an Iceberg sink table
- Submit an <span class="inline-code">INSERT</span> that reads data from the Kafka source and writes it to the Iceberg sink
The Kafka source needs to declare the schema of the data in the topic, its serialisation format, and then configuration details about the topic and broker:
This is for reading a Kafka topic called <span class="inline-code">orders</span> with a payload that looks like this:
Now to set up the Iceberg sink. We're going to combine it with an implicit <span class="inline-code">INSERT</span> and do it as a single piece of DDL. Before that, let's make sure that we get some data written to the files, by telling Flink to periodically checkpoint and flush the data to Iceberg:
All that's left to do now is create the Iceberg table, using the <span class="inline-code">CREATE TABLEโฆAS SELECT</span> syntax:
This creates the table, and also a Flink SQL job:
Weโre writing the Iceberg files to MinIO, which is a self-hosted S3-compatible object store. Once the job has checkpointed (which I've configured to happen every minute) I can see files appearing in the object store:
So that's our writing to Iceberg set up and running. Now it's just Iceberg, so we can use any Iceberg-compatible tooling with it.
Examining the metadata with <span class="inline-code">pyiceberg</span>
A bunch of Parquet and Avro files in an object store aren't much use on their own; but combined they make up the Iceberg table format. A quick way to inspect them is with <span class="inline-code">pyiceberg</span> which offers a CLI. To use it you need to connect it to a catalogโin my example I'm using the Hive MetaStore, so configure it thus:
Now we can find out about the Iceberg table directly:
Querying Iceberg data with DuckDB
Having poked around the metadata, let's query the data itself. There are a variety of engines and platforms that can read Iceberg data, including Flink (obviously!), Apache Spark, Trino, Snowflake, Presto, BigQuery, and ClickHouse. Here I'm going to use DuckDB.
We'll install a couple of libraries that are needed; iceberg (can you guess why?!) and httpfs (because we're reading data from S3/MinIO):
Then we set up the connection details for the local MinIO instance:
Finally, letโs query the data. There's no direct catalog support in DuckDB yet so we have to point it directly at the Iceberg manifest file:
Back to the start โฉ๏ธ
Well that wasn't so bad, right? In fact, pretty simple, I would say. Create a table, create a sink, and off you go.
There are a few bits that I hugely skimmed over in my keenness to show you the Iceberg-y goodness which I'm now going to cover in more detail
- When I created the sink the CTAS statement did a lot of heavy lifting in terms of its potential power, and I'm going to discuss that some more below
- I presented my working Iceberg deployment as a fait accompli, when in reality it was the output of rather a lot of learning and random jiggling until I got it to workโso I'm going to explain that bit too.
All the power of SQL ๐ช
Let's have a look at this innocuous CTAS that we ran above:
It's doing several things, only one of which is particularly obvious:
- The obvious: creating a table with a bunch of Iceberg properties
- Less obvious (1): defining the schema for the Iceberg table implicitly as matching that of the source <span class="inline-code">t_k_orders</span>
- Less obvious (2): populating the Iceberg table with an unmodified stream of records fetched from Kafka
Looking at that final point though, we could do so much more. How about filtering for orders with a high cost value?
Or customers who are VIPs?
We can also project a different set of columns, perhaps to extract just a subset of the fields needed for optimisation reasons:
Finally, we don't have to use CTAS. Perhaps you simply want to decouple the creation from the population, or you want to have multiple streams populating the same sink and keeping things separate is more logical. The above two examples combined might look like this:
The <span class="inline-code">INSERT</span> statements are streaming jobs that run continuously, as can be seen from the <span class="inline-code">SHOW JOBS</span> output:
Finally, you could even combine the two <span class="inline-code">INSERT</span>s with a <span class="inline-code">UNION</span>:
This gives us a single Flink job that looks like this:
Iceberg table properties
There are various properties that you can configure for an Iceberg table. In my example above I just specified the bare minimum and let everything else use its default.
You can specify additional properties when you create the table. For example, to change the format of the data files from the default of Parquet to ORC, set the <span class="inline-code">write.format.default</span>:
Which when populated stores the data, as expected, in ORC format:
You can also change the value after creation:
or reset it to its default value:
Iceberg dependencies for Flink
As I explored (at length ๐ ) in several previous articles, you often need to add dependencies to Flink for it to work with other technologies. Whether it's formats (such as Parquet), catalogs (such as Hive), or connectors (such as Iceberg), juggling JARs is one of the most fun aspects of running Flink yourself. Of course, if you don't enjoy that kind of fun, you could just let Decodable do it for you.
Catalog
To use Iceberg you need to have a catalog metastore. Catalogs in Flink can be a bit hairy to understand at first, but have a read of this primer that I wrote for a gentle yet thorough introduction to them.ย
Iceberg can work with different catalogs (I explore several of them here), and in this blog article I've used the Hive Metastore (HMS).
To use the Hive metastore with Flink you need the Hive connector <span class="inline-code">flink-sql-connector-hive-3.1.3</span> which you can find on Maven repositoryโmake sure you use the correct one for your version of Flink.
As well as the Hive connector, you need Hadoop dependencies. I wrote about this elsewhere but you either install the full Hadoop distribution, or cherry-pick just the JARs needed.
You also need to configure <span class="inline-code">conf/hive-site.xml</span> so that Flink can find the Hive MetaStore:
Iceberg JARs
Iceberg support in Flink is provided by the <span class="inline-code">iceberg-flink-runtime</span> JAR. As with the catalog JAR, make sure you line up your versionsโso in this case both Iceberg and Flink. For Iceberg 1.5.0 and Flink 1.18 I used iceberg-flink-runtime-1.18-1.5.0.jar
Object Store (S3)
Finally, since we're writing to data on S3-compatible MinIO, we need the correct JARs for that. I'm using <span class="inline-code">s3a</span>, which is provided by hadoop-aws-3.3.4.jar.ย
Flink will also need to know how to authenticate to S3, and in the case of MinIO, how to find it. One way to do this is supply these details as part of <span class="inline-code">conf/hive-site.xml</span>:
Want the easy route?
(Of course, the really easy route is to use Decodable, where we do all of this for you. ๐)
I've shared on the Decodable examples GitHub repo the following:
- a Docker Compose so that you can run all of this yourself just by running <span class="inline-code">docker compose up</span>
- a list of the specific JAR files that I used
- and moreโฆsample SQL statements, step-by-step README, and some other noodling around with Iceberg and catalogs for good measure.
What if you get in a jam with your JARs?
Here are the various errors (usually a <span class="inline-code">ClassNotFoundException</span>, but not always) that you can expect to see if you miss a JAR from your dependencies. The missing JAR shown is based on the following versions:
- Flink 1.18
- Hive 3.1.3
- Iceberg 1.5.0
- Hadoop 3.3.4
You can find out more here about locating the correct JAR to download and where to put it once you've downloaded it.
Conclusion
I've shown you in this blog post how to stream data from Kafka to Iceberg using purely open source tools. I used MinIO for storage, but it would work just the same with S3. The catalog I used was Hive Metastore, but there are othersโsuch as AWS Glue.
If you want to try it out for yourself head to the GitHub repository, and if you want to try loading data from Kafka to Iceberg but don't fancy running Flink for yourself sign up for a free Decodable account today and give our fully managed service a go.