Back
July 2, 2024
10
min read

Decodable vs. Amazon MSF: Getting Started with Flink SQL

By
Robin Moffatt
Share this post

One of the things that I love about SQL is the power that it gives you to work with data in a declarative manner. I want this thing…go do it. How should it do it? Well that’s the problem for the particular engine, not me. As a language with a pedigree of multiple decades and no sign of waning (despite a wobbly patch for some whilst NoSQL figured out they actually wanted to be NewSQL 😉), it’s the lingua franca of data systems.

Which is a long way of saying: if you work with data, SQL is mandatory. There, I said it. And if you’re transforming data as part of an ETL or ELT pipeline, it’s one of the simplest and most powerful ways to do it. So let’s see how we can use it with two different platforms in the market that are both built on Apache Flink but expose their support for SQL (specifically, Flink SQL) in very different ways. The first of these is Amazon’s MSF (Managed Service for Apache Flink), and the second is Decodable.

At this point a disclaimer is perhaps warranted, if not entirely obvious: I work for Decodable, and you’re reading this on a Decodable blog. So I’m going to throw MSF under the bus, obviously? Well, no. I’m not interested in that kind of FUD. Hopefully this blog will simply serve to illustrate the similarities and differences between the two.

Let’s build something!

I’m going to take a look at what it takes to build an example of a common pipeline—getting data from a Kafka topic and using SQL to create some aggregate calculations against the data and send it to an S3 bucket in Iceberg format.

I’ll start by exploring Amazon MSF and what it takes to build out the pipeline from the point of view of a data engineer who is deeply familiar with SQL—but not Java or Python (hence this blog - Flink SQL!). After that I’ll look at doing the same thing on Decodable and what the differences are.

Running SQL on Amazon MSF

After logging into the AWS Console and going to the MSF page, I have two options for getting started, <span class="inline-code">Streaming applications</span> or <span class="inline-code">Studio notebooks</span>.

CleanShot 2024-06-17 at 11.13.20

Having not used MSF before and following the logic that since I’m building a pipeline rather than just exploring data, I initially went down the route of creating a <span class="inline-code">Streaming application</span>. This turned out to be a dead-end, since it’s only if you’re writing Java or Python. Whilst you can wrap Flink SQL within these, it’s hardly the pleasant developer experience I was hoping for. I subsequently made my way over to <span class="inline-code">Studio notebooks</span>. This gives you what it says on the tin—a notebook. Beloved of data scientists and analysts, notebooks are brilliant for noodling around with bits of data, creating reproducible examples to share with colleagues, etc.

Jupyter and Apache Zeppelin are the two primary notebook implementations used, and here Amazon has gone with a pretty vanilla deployment of Zeppelin. This is what you get after creating the notebook and starting it up:

CleanShot 2024-06-17 at 12.04.25

This is almost too vanilla for my liking; as someone coming to MSF with the hope of doing some Flink SQL, I’m pretty much left to my own devices to figure out what to do next. I clicked on  <span class="inline-code">Create new note</span> and ended up with this:

CleanShot 2024-06-18 at 11.21.58@2x

’k cool. Now what?

kermit-the-frog-looking-for-directions-gif

(Ironically, if you were to run the latest version of Zeppelin yourself you’d find that it ships with a whole folder of Flink examples—which would have been rather useful here.)

Anyway, back on Amazon MSF I went back a screen and opened the  <span class="inline-code">Examples</span> notebook. This at least made more sense than a blank page. 🙂

CleanShot 2024-06-17 at 12.07.57

The  <span class="inline-code">%flink.ssql</span> in each cell denotes the interpreter that’s used to execute the code in that cell—there are several Flink interpreters available for Apache Zeppelin.

So let’s actually give it a go. As mentioned above, the pipeline we’re building is to stream data from a Kafka topic into Apache Iceberg on S3, calculating some aggregations on the way.

Reading data from Kafka with Amazon MSF

Having explored Flink SQL for myself over the past few months I knew to go and look up the Kafka connector for Flink. Connectors in Flink SQL are defined as tables, from which one can read and write data to the associated system. The  <span class="inline-code">Examples</span> notebook also gives, well, examples to follow too. To start with I wanted to just check connectivity, so I deleted out what was there in the cell and created a single-column table connecting to my cluster:

CleanShot 2024-06-18 at 11.47.27 2

Hmmm, no good.

<console>:5: error: unclosed character literal (or use " for string literal "connector")
          'connector' = 'kafka',
                    ^
<console>:5: error: unclosed character literal (or use " for string literal "kafka")
          'connector' = 'kafka',

I mean, it looks OK to me. After some circling around, I realised that I’d not only deleted the existing example code, but the interpreter string too. With the magic <span class="inline-code">%flink.ssql</span> restored, my table was successfully created:

CleanShot 2024-06-18 at 11.47.58

This is where the nice bit of notebooks comes in—I can build up this example step by step, and actually share the notebook at the end with you, dear reader, and you’ll be able to run it too. I can also add comments and narration, within the notebook itself.

CleanShot 2024-06-18 at 12.32.17
INSERT INTO msf_kafka_test_01 VALUES ('hello world') 

Now let’s run the <span class="inline-code">INSERT</span> shown here, which if all has gone well should result in a message in JSON format (as specified by <span class="inline-code">format</span> in the table definition) in my <span class="inline-code">test_from_msf</span> topic:

# Here's the topic
kcat -b broker:29092 -L
Metadata for all topics (from broker 1: broker:29092/1):
 1 brokers:
  broker 1 at broker:29092 (controller)
 1 topics:
  topic "test_from_msf" with 1 partitions:
    partition 0, leader 1, replicas: 1, isrs: 1

# Here's the message
kcat -b broker:29092 -C -t test_from_msf
{"column1":"hello world"}
% Reached end of topic test_from_msf [0] at offset 1

Looks good! Let’s bring in the actual Kafka topic that we want to read from. The data is simulated transactions from a chain of supermarkets and looks like this:

{
    "customerId": "dc91f8a7-c310-8656-f3ef-f6f0a7af7e58",
    "customerName": "Laure Treutel",
    "customerAddress": "Suite 547 635 Brice Radial, New Jantown, NH 34355",
    "storeId": "401fdb6d-ecc2-75a5-8d0d-efdc64e8637b",
    "storeName": "Swaniawski Group",
    "storeLocation": "Taylorton",
    "products": [
        {
            "productName": "Mediocre Bronze Shoes",
            "quantity": 4.90896716771463,
            "unitPrice": 2.21,
            "category": "Garden, Industrial & Shoes"
        },
        {
            "productName": "Aerodynamic Iron Table",
            "quantity": 2.5936751424049076,
            "unitPrice": 15.79,
            "category": "Music"
        },
        […]
    ],
    "timestamp": "2024-06-16T16:11:21.695+0000"
}

Our first task is to come up with the DDL to declare this table’s schema. You can do this by hand, or trust to the LLM gods—I found Claude 3 Opus gave me what seemed like a pretty accurate answer:

CleanShot 2024-06-18 at 12.52.00

The only thing I needed to change was quoting the reserved word <span class="inline-code">timestamp</span>  being used as a field name. Using this I started a new notebook that would be for my actual pipeline, and began with creating the Kafka source:

CleanShot 2024-06-18 at 12.56.07

With the table created, let’s run a <span class="inline-code">SELECT</span> to sample the data to make sure things are working:

CleanShot 2024-06-18 at 14.19.18

Instead of fighting with type conversions at this stage (that fun can wait!) I’m just going to make this a <span class="inline-code">STRING</span> field and worry about it later. Unfortunately, MSF has other ideas:

CleanShot 2024-06-18 at 14.40.17

After some digging it turns out that <span class="inline-code">ALTER TABLE…MODIFY</span> was added in Flink 1.17 and MSF is running Flink 1.15.

Since we’re not using the table anywhere yet, let’s just drop it and recreate it.

CleanShot 2024-06-18 at 14.53.22

This is good, and bad. Mostly bad. The good bit is that it reminds me that when I created the notebook I selected an existing Glue catalog, and it looks like MSF is seamlessly using it. The bad? I need to go IAM-diving to figure out what the permission problem is here.

Or…sidestep the issue and just create the table with a different name 😁

CleanShot 2024-06-18 at 15.01.24

OK! Now we are getting somewhere. With data flowing in from our Kafka topic let’s build the second half of our pipeline, in which we’re going to compute aggregates on the data before sending it to Apache Iceberg.

Running Flink SQL to transform data on Amazon MSF

Whilst configuring the connection to Kafka might have been a bit more fiddly than we’d have liked, building actual transformational SQL in the Zeppelin notebook is precisely its sweet spot. You can iterate to your heart’s content to get the syntax right and dataset exactly as you want it. You can also take advantage of Zeppelin’s built-in visualisation capabilities to explore the data further:

CleanShot 2024-06-18 at 16.44.41

With the data exploration and noodling done, we’ll wrap our aggregate query as a <span class="inline-code">VIEW</span> to use later when we send it to Iceberg.

CREATE VIEW basket_agg AS 
SELECT customerName,
    COUNT(*) AS unique_items,
    SUM(quantity) AS unit_ct
FROM basket01
    CROSS JOIN UNNEST(products) AS t(productName, quantity, unitPrice, category)
GROUP BY customerName;

Here I’m using <span class="inline-code">UNNEST</span> to get at the individual records in the <span class="inline-code">products</span> array, and create a result set of the number of basket items (<span class="inline-code">COUNT(*)</span>) and total units (<span class="inline-code">SUM(quantity)</span>) broken down by customer (<span class="inline-code">customerName</span>).

Attempting to write to Apache Iceberg from Amazon MSF…

Just as we used a table to read data from Kafka above, we’ll use a table to write to Iceberg on S3.

If I were running this on my own Apache Flink cluster, I’d do this:

Flink SQL> CREATE TABLE basket_iceberg
           WITH (
           'connector' = 'iceberg',
           'catalog-type'='hive',
           'catalog-name'='dev',
           'warehouse' = 's3a://warehouse',
           'hive-conf-dir' = './conf') AS
           SELECT * FROM basket_agg; 

This is based on using Hive as a catalog metastore, whilst here I’m using Glue. I figured I’d slightly guess at the syntax of the command, assuming that I can randomly jiggle things from the resulting error to get them to work.

But I didn’t even get that far. MSF doesn’t seem to be happy with my<span class="inline-code">CREATE TABLE…AS SELECT</span>.

CleanShot 2024-06-19 at 16.00.55

As with <span class="inline-code">ALTER TABLE…MODIFY</span> above, CREATE TABLE…AS SELECT (CTAS) wasn’t added to Flink until 1.16, and MSF is on 1.15. Instead, we’re going to have to split it into two stages - create the table, and then populate it.

Unlike CTAS in which the schema is inherited from the<span class="inline-code">SELECT</span> , here we’re going to have to define the schema manually. We can get the schema from a <span class="inline-code">DESCRIBE</span> against the view:

CleanShot 2024-06-19 at 16.03.06

Copying this schema into our <span class="inline-code">CREATE TABLE</span> gives us this:

CleanShot 2024-06-19 at 16.04.18

So now to populate it…maybe.

CleanShot 2024-06-19 at 16.05.20

Ahhhh Java stack traces my old friend. How I’ve missed you.

not

The one we’ve got here boils down to this:

org.apache.flink.table.api.ValidationException: 
Could not find any factory for identifier 'iceberg' that implements
'org.apache.flink.table.factories.DynamicTableFactory' in the classpath. 

I’ve encountered this error previously. It means that the Flink installation only has a certain set of connectors installed, and not the one we want. The error message actually tells us which ones are available:

Available factory identifiers are:

blackhole
datagen
filesystem
kafka
kinesis
print
upsert-kafka

So how do we write to Iceberg? We need to install the Iceberg connector, obviously! The MSF docs explain how to do this. In essence, I need to put the Iceberg JAR in an S3 bucket, and restart the notebook with the dependency added.

Let’s grab the Iceberg JAR for Flink 1.15 (wanna know more about navigating JAR-hell? You’ll enjoy this blog that I wrote) and upload it to S3:

curl https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-flink-runtime-1.15/1.4.3/iceberg-flink-runtime-1.15-1.4.3.jar -O
aws s3 cp iceberg-flink-runtime-1.15-1.4.3.jar s3://rmoff/iceberg-flink-runtime-1.15-1.4.3.jar

Now to add it as a custom connector:

CleanShot 2024-06-19 at 17.01.09

Unfortunately, MSF wasn’t happy with this. When I click on Create Studio notebook it fails after a minute or two with this:

CleanShot 2024-06-19 at 17.04.49
Please check the role provided or validity of S3 location you provided. We are unable to get the specified fileKey: s3://rmoff/iceberg-flink-runtime-1.15-1.4.3.jar in the specified bucket: rmoff

Ho hum. Since it says “please check the role” I went back a couple of screens to review the notebook’s configuration for IAM, which shows this:

CleanShot 2024-06-19 at 17.10.32

The IAM role is being created automagically, so things ought to Just Work? Right?

Getting started with Flink SQL on Amazon MSF vs Decodable1

So down the rabbit hole we go to the IAM policy editor. Looking up the permissions attached to the policy that MSF created shows, after a bit of scratching around, the problem:

CleanShot 2024-06-19 at 17.17.56

The problem wasn’t actually the IAM that MSF provisioned, but the information with which it did so.

When I specified the Path to S3 Object I gave the fully qualified S3 URL,<span class="inline-code">s3://rmoff/iceberg-flink-runtime-1.15-1.4.3.jar</span> . What MSF wanted was the path within the S3 bucket that I’d also specified (<span class="inline-code">s3://rmoff</span> ). What this ended up with was MSF concatenating the two and configuring IAM for a JAR file at<span class="inline-code">s3://rmoff/s3://rmoff/iceberg-flink-runtime-1.15-1.4.3.jar</span>. This is also confirmed if I go back to the connector configuration screen and look closely at the S3 URI shown as a result of my erroneous input:

CleanShot 2024-06-19 at 17.23.17

So, let’s remove that connector and re-add it with the correct path this time:

CleanShot 2024-06-19 at 17.24.13

I can also click on the hyperlink to confirm that the object is there:

CleanShot 2024-06-19 at 17.24.34

Having some guard rails provided by input validation would have been a nice touch here, but it wasn’t to be.

So. Back to starting the notebook up again.

CleanShot 2024-06-19 at 17.25.34

Woohoo! Let’s go open that notebook and get back to where we were at before this irritating detour, and now hopefully with Iceberg superpowers 🦸. Because we’ve got the Glue catalog storing our table definitions, we should be able to pick up where we left off. This time we get a different error when we try to write to the Iceberg sink—progress!

CleanShot 2024-06-19 at 17.56.36
Caused by: org.apache.flink.table.api.ValidationException: 
Unable to create a sink for writing table 'hive.rmoff.basket_iceberg_00'.

[…]
Caused by: java.lang.NullPointerException: 
Table property 'catalog-name' cannot be null

To be honest, I wasn’t expecting creating an Iceberg connection to be quite as simple as what I tried:

CREATE TABLE […]
    WITH ('connector' = 'iceberg') 

So now onto the fun and fame of working out the correct parameters. The complication here—and the reason I can’t just copy and paste like any good developer reading StackOverflow—is that on MSF the catalog is provided by Glue, not Hive. The MSF documentation on connectors outside Kinesis and Kafka is pretty sparse, and there’s nothing for Iceberg that I could find. The documentation for Apache Iceberg’s Flink connector lists the properties required, and after a bit of poking around there I found this section about using it with a Glue catalog. There’s also this section on using Iceberg from Amazon EMR—which is not the same as MSF, but perhaps has the same pattern of access for Glue and S3 when writing Iceberg data. One point to note here is that the EMR document talks about creating Flink <span class="inline-code">CATALOG</span> objects—I’d recommend you review my primer on catalogs in Flink too if you haven’t already, since it’s an area full of ambiguity and overloaded terms.

Distilling down the above resources, I tried this configuration for the table next:

CREATE TABLE basket_iceberg_01 (
            customerName STRING,
            unique_items BIGINT,
            unit_ct DOUBLE)
       WITH ('connector'   = 'iceberg',
             'catalog-type'= 'glue',
             'catalog-name'= 'rmoff',
             'warehouse'   = 's3://rmoff/msf-test',
             'io-impl'     = 'org.apache.iceberg.aws.s3.S3FileIO') 

Here <span class="inline-code">catalog-name</span> matches my Glue catalog name. When I try to insert into it, I get a different error: <span class="inline-code">java.lang.ClassNotFoundException: org.apache.hadoop.hdfs.HdfsConfiguration</span>.

I mean, we’re only this far in and haven’t had a <span class="inline-code">ClassNotFoundException</span> yet…I think we’re doing pretty well 🤷? My impression of MSF is feeling like the trials and tribulations of learning Apache Flink running locally, but with one arm tied behind my back as I try to debug a system running a version of Flink that’s two years old and with no filesystem access to check things like JARs.

Back to our story. I spent a bunch of time learning about JARs in Flink so was happy taking on this error. <span class="inline-code">ClassNotFoundException</span> means that Flink can’t find a Java class that it needs. It’s usually solved by providing a JAR that includes the class and/or sorting out things like classpaths so that the JAR can be found. Searching a local Hadoop installation (remember those?) turned up this:

Found in: /Users/rmoff/hadoop/hadoop-3.3.4/share/hadoop/hdfs/hadoop-hdfs-client-3.3.4.jar

But now I’m a bit puzzled. We’re hoping to write to S3, so why does it need an HDFS dependency?

On a slight tangent, and taking a look at the table DDL I’m running, it’s using the Flink Iceberg connector as described here to directly create a table, sidestepping the explicit Iceberg catalog creation usually seen in examples. Let’s try creating a catalog as seen here:

CREATE CATALOG c_iceberg WITH (
   'type'='iceberg',
   'warehouse'='s3://rmoff/msf-test',
   'catalog-impl'='org.apache.iceberg.aws.glue.GlueCatalog',
   'io-impl'='org.apache.iceberg.aws.s3.S3FileIO'
 ); 

But, same error:<span class="inline-code">java.lang.ClassNotFoundException: org.apache.hadoop.hdfs.HdfsConfiguration</span> .

Looking at the Iceberg docs again it seems I’ve perhaps missed a dependency, so back to creating a new notebook, this time with <span class="inline-code">hadoop-aws</span> and <span class="inline-code">iceberg-aws-bundle</span> added. I’d wanted to include <span class="inline-code">aws-java-sdk-bundle</span> since I’ve needed that when running things locally, but I guess that one wasn’t to be:

CleanShot 2024-06-19 at 22.00.46@2x
Total combined size for DEPENDENCY_JAR CustomArtifactConfiguration exceeded, 
maximum: 314572800, given: 524262131

So this is what I’ve got for this notebook’s attempt:

CleanShot 2024-06-19 at 22.06.05@2x

Still no dice—same error:<span class="inline-code">java.lang.ClassNotFoundException: org.apache.hadoop.hdfs.HdfsConfiguration</span>. Let’s take the bull by the horns and add the specific JAR file that contains this (<span class="inline-code">hadoop-hdfs-client</span>):

CleanShot 2024-06-19 at 22.25.41@2x

After restarting the notebook I can see the JARs picked up in Zeppelin’s <span class="inline-code">flink.execution.jars</span> :

CleanShot 2024-06-19 at 22.30.01@2x

But… still… when I try to create an Iceberg catalog as above, the… same… error…<span class="inline-code">java.lang.ClassNotFoundException: org.apache.hadoop.hdfs.HdfsConfiguration</span>.

ahhh

So what am I going to do? What any sensible data engineer given half the chance would at this point…give up and try something different. Instead of writing to Iceberg on S3, we’ll compromise and simply write parquet files to S3. This should hopefully be easier since the Flink filesystem connector is one of the connectors that is included with MSF.

Attempting to write Parquet files to S3 from Amazon MSF…

This should be much simpler:

CREATE TABLE s3_sink  (
            customerName STRING,
            unique_items BIGINT,
            unit_ct DOUBLE) 
            WITH ('connector'='filesystem', 
                  'path' = 's3://rmoff/msf-test-parquet',
                  'format' = 'parquet');

INSERT INTO s3_sink 
SELECT * FROM basket_agg; 

But simple is boring, right?

org.apache.flink.table.api.ValidationException: 
Could not find any format factory for identifier 'parquet' in the classpath. 

Up and over to the dependencies again, since we need to add the necessary JAR for parquet format.

CleanShot 2024-06-19 at 22.54.43@2x

But can you spot what might be the problem?

CleanShot 2024-06-19 at 22.54.18@2x

I’d picked up the JAR for Flink 1.18.1—and MSF is running Flink 1.15. I don’t know for certain that the version difference is the problem, but from experience aligning versions is one of the first things to do when troubleshooting.

So, stop the notebook, update the JAR (it’s called “Custom Connectors” in the UI, but it seems to encompass any dependencies), start the notebook… and progress (of sorts):

CleanShot 2024-06-19 at 23.05.28@2x

Of course…we’ve created an aggregate, and we’re trying to write it to a file type which doesn’t support updates. Parquet is append-only, so as the aggregate values change for a given key, how can we represent that in the file?

So that we’ve not completely failed to build an end-to-end pipeline I’m going to shift the goalposts and change the aggregate to a filter, selecting all customers from the Kafka source whose names begin with the letter “b”:

CREATE VIEW customers_b AS 
SELECT customerId, customerName, customerAddress
FROM basket01
WHERE LOWER(customerName) LIKE 'b%'; 

Now let’s write the filtered data to Parquet on S3:

CREATE TABLE s3_customers_b (
     customerId STRING,
     customerName STRING,
     customerAddress STRING
) WITH (
     'connector' = 'filesystem',
     'path'      = 's3://rmoff/msf-test-parquet',
     'format'    = 'parquet'
);

INSERT INTO s3_customers_b SELECT * FROM customers_b; 

This works! For ten glorious seconds! And then…

CleanShot 2024-06-19 at 23.14.01@2x

A couple of StackOverflow answers suggest that even with the Parquet JAR I still need some Hadoop dependencies. So back to the familiar pattern:

  1. Close the notebook
  2. Force stop the notebook (it’s quicker, and I don’t care about data at this point)
  3. Wait for the force stop to complete
  4. Add hadoop-common-3.3.4.jar to the list of Custom connectors for the notebook
  5. Wait for the notebook to update
  6. ‘Run’ (start) the notebook
  7. Wait for the start to complete
  8. Open the notebook
  9. Rerun the failing statement

Unfortunately I’m now just off down the same whack-a-mole route as I was with Iceberg, this time with<span class="inline-code">java.lang.ClassNotFoundException: com.ctc.wstx.io.InputBootstrapper</span> :

CleanShot 2024-06-20 at 10.02.28

Even after adding <span class="inline-code">hadoop-client-runtime-3.3.4.jar</span>, I get the same error. So now I’m wondering about the versions. What version of Hadoop dependencies is Flink 1.15 going to want? I failed to find anything in the Flink docs about versions, so instead went to Maven Repository and picked a Hadoop version based on the date that was fairly close to the release of Flink 1.15 (May 2022)—3.2.2.

This was the last roll of the dice…

CleanShot 2024-06-20 at 10.51.15

Still no dice.

CleanShot 2024-06-20 at 11.06.12

Writing CSV or JSON files to S3 from Amazon MSF

So now I’m going to fall back from Iceberg…to Parquet…to CSV…something…anything…

Here’s the CSV table and<span class="inline-code">INSERT</span>:

CleanShot 2024-06-20 at 11.12.23

But hey—a different error 😃 And this one looks less JAR-related and probably more IAM.

java.nio.file.AccessDeniedException: msf-test-csv/part-3b4dd54f-e4f5-452f-9884-9d131908302c-0-0: initiate MultiPartUpload on msf-test-csv/part-3b4dd54f-e4f5-452f-9884-9d131908302c-0-0: 
com.amazonaws.services.s3.model.AmazonS3Exception: Access Denied (Service: Amazon S3; Status Code: 403; Error Code: AccessDenied; Request ID: 5209TZF3D521ZQJP; S3 Extended Request ID: QIQm89EsUXNYlHVKUHApjZjY1abc4RjK65rvLUW2BdtOC62AGgV99XjMwdlBNVJVVrOIHIGMoBaLqzh/038gyA==; Proxy: null) 

This actually seems fair enough; looking at the IAM details from the AWS console, the IAM role that MSF automagically configures only has access on S3 to the three JARs that I uploaded when I was trying to get parquet to work:

{
    "Sid": "ReadCustomArtifact",
    "Effect": "Allow",
    "Action": [
        "s3:GetObject",
        "s3:GetObjectVersion"
    ],
    "Resource": [
        "arn:aws:s3:::rmoff/hadoop-common-3.2.2.jar",
        "arn:aws:s3:::rmoff/hadoop-client-runtime-3.2.2.jar",
        "arn:aws:s3:::rmoff/flink-sql-parquet-1.15.0.jar"
    ]
},

I added a second policy just to try to get things to work for now. It’s very permissive, and overlaps with the <span class="inline-code">ReadCustomArtifact</span> one (which arguably is redundant anyway if we don’t need any of the parquet and dependency JARs):

{
    "Sid": "ReadAndWriteData",
    "Effect": "Allow",
    "Action": [
        "s3:*"
    ],
    "Resource": [
        "arn:aws:s3:::rmoff/*"
    ]
}

Back to the notebook and things are finally looking hopeful:

CleanShot 2024-06-20 at 11.29.58

Two minutes and no error yet! But also…no data yet:

$ aws s3 ls s3://rmoff/msf-test-csv

$

Twiddling my thumbs and clicking around a bit I notice this little FLINK JOB icon appeared a short while after running the statement:

CleanShot 2024-06-20 at 12.03.19

Clicking on it takes me to the vanilla Flink web dashboard:

CleanShot 2024-06-20 at 12.04.35

CleanShot 2024-06-20 at 12.16.56

This is nice, but doesn’t help me with where my CSV files are. A bit more digging around leads me to the FileSystem documentation and specifically the section about when the part files are written. The <span class="inline-code">sink.rolling-policy.rollover-interval</span> defaults to 30 minutes. Sure enough, after 30 minutes (ish) something does happen:

CleanShot 2024-06-20 at 12.21.28
java.lang.UnsupportedOperationException: 
S3RecoverableFsDataOutputStream cannot sync state to S3. 
Use persist() to create a persistent recoverable intermediate point.

Funnily enough, I’ve hit a very similar error previously, which then leads me to FLINK-28513. Unfortunately this is a bug from Flink 1.15 which has been fixed in 1.17 and later…but MSF is on 1.15, so we’re stuck with it.

Continuing our descent into oblivion of finding a file format—any file format!—to successfully write to S3, let’s try JSON:

CREATE TABLE s3_customers_b_json_01 (
     customerId      STRING,
     customerName    STRING,
     customerAddress STRING
) WITH (
     'connector' = 'filesystem',
     'path'      = 's3://rmoff/msf-test/customers_b-json',
     'format'    = 'json',
     'sink.rolling-policy.rollover-interval' = '30sec'
); 

I’ve also overridden the default <span class="inline-code">sink.rolling-policy.rollover-interval</span> to 30 seconds. If I understand the documentation correctly, then every 30 seconds Flink should start a new file to write to on disk.

Let’s try it!

INSERT INTO s3_customers_b_json_01
    SELECT * FROM customers_b; 

After a break for tea, I came back to find the job still running (yay!)

CleanShot 2024-06-20 at 15.38.36

but no data on S3 (boo!)

$ aws s3 ls --recursive s3://rmoff/msf-test

$

The final piece of the puzzle that I’ve been missing is checkpointing. By default this is disabled, which can be seen here:

CleanShot 2024-06-20 at 15.41.17

I can’t see a way to force a Flink SQL job to checkpoint, so canceled the <span class="inline-code">INSERT</span> and re-ran it with checkpointing enabled:

SET 'execution.checkpointing.interval' = '10sec';

INSERT INTO s3_customers_b_json_01
    SELECT * FROM customers_b; 

Ten seconds is pretty aggressive for checkpointing, but then I am pretty impatient when it comes to testing that something is working as I hope. And the good news is that eventually, mercifully, it has worked.

aws s3 ls --recursive s3://rmoff/msf-test
2024-06-20 15:53:07      90187 msf-test/customers_b-json/part-09af660f-a216-4282-9d7c-fc0ad7225749-0-0
2024-06-20 15:54:05      92170 msf-test/customers_b-json/part-09af660f-a216-4282-9d7c-fc0ad7225749-0-1

How frequently you set your checkpointing and file rolling policy depends on how many smaller files you want, and how soon you want to be able to read the data that’s being ingested. Ten seconds is almost certainly what you do not want—if you need the data that soon then you should be reading it from the Kafka topic directly.

📓 You can find the source for the notebook here.

Wrapping things up on MSF

Well, what a journey that was. We eventually got something working to prove out an end-to-end SQL pipeline. We had to make a bunch of compromises—including ditching the original requirement to aggregate the data—which in real life would be a lot more difficult to stomach, or would have more complicated workarounds. But we’ve got a notebook that defines a streaming pipeline. Yay us! The final step with something like this would be to run it as an application. Notebooks are great for exploration and experimentation, but now we have something that works and we don’t want to have to launch Zeppelin each time to run it.

Running an MSF notebook as an application

MSF has an option to deploy a notebook as a streaming application, although the list of caveats is somewhat daunting. The one which I’m stumbling on is this one:

we only support submitting a single job to Flink.

Looking at the Flink Dashboard I can see two jobs; the Kafka source, and the S3 sink:

CleanShot 2024-06-20 at 17.19.06 1

Let’s give it a try anyway. After stopping the notebook from the console, there’s the option under Configuration for Deploy as application configuration which after putting my bucket name in looks like this:

CleanShot 2024-06-20 at 17.28.27

Now after clicking Run on the notebook it starts up again, and I figure it’s deployed it as an application? Maybe? But there’s nothing in the S3 bucket, and the notebook seems to just open as normal, so I’m a bit puzzled as to what’s going on.

It turns out that what’s not mentioned on the docs page itself—but is in a related tutorial—is that there are more steps to follow. First, build the application from the notebook:

CleanShot 2024-06-24 at 10.07.13

CleanShot 2024-06-24 at 10.07.55

…but at this point it already has a problem.

CleanShot 2024-06-24 at 10.08.44
Kafka→⚙️→S3 (JSON) has failed to build and export to Amazon S3: 
Statement is unsupported for deploying as part of a note. 
Only statements starting with INSERT, UPDATE, MERGE, DELETE, 
USE, CREATE, ALTER, DROP, SET or RESET are supported. 
SELECT * FROM basket01 LIMIT 5

So, it looks like my notebook—in which I’ve done what one does in a notebook, namely to explore and iterate and evolve a data processing pipeline—is not ok as it is. I need to go back through it and trim it down to remove the interactive bits (the ones that made it useful as a notebook, really).

The Kafka source and S3 sink tables and the view that I’m using to filter the data for the sink are held in the Glue catalog:

CleanShot 2024-06-24 at 10.18.25

So all I need in my notebook now is the actual ‘application’—the <span class="inline-code">INSERT INTO</span> streaming SQL that connects them all together. I’ve stripped it back and saved it as a new note:

CleanShot 2024-06-24 at 10.19.44

Now when I build it MSF is happy:

CleanShot 2024-06-24 at 10.22.11

in my S3 bucket I can see code:

$ aws s3 ls s3://rmoff/rmoff-test-04/zeppelin-code/
2024-06-24 10:22:02  447072452 rmoff-test-04-Kafka____S3__JSON_-2JZ3CVU11-2024-06-24T09:22:01.370764Z.zip

Being of a curious bent, I of course want to know how two SQL statements compile to an application of 400Mb, so download the ZIP file. Within a bunch of Python files is this one, <span class="inline-code">note.py</span>:

# This file was synthesized from a Kinesis Analytics Zeppelin note.

from flink_environment import s_env, st_env, gateway
from pyflink.common import *
from pyflink.datastream import *
from pyflink.table import *
from pyflink.table.catalog import *
from pyflink.table.descriptors import *
from pyflink.table.window import *
from pyflink.table.udf import *
from zeppelin_context import z, __zeppelin__

import pyflink
import errno
import os
import json
import textwrap
import sys


print("Executing %flink.ssql paragraph #1")
st_env.execute_sql(
    textwrap.dedent(
        """
        SET 'execution.checkpointing.interval' = '60sec'
        """
    ).strip()
)
statement_set = st_env.create_statement_set()
statement_set.add_insert_sql(
    textwrap.dedent(
        """
        INSERT INTO s3_customers_b_json_01
            SELECT * FROM customers_b
        """
    ).strip()
)
statement_set.execute()

So with the note built into an application ZIP on S3, we can now deploy it. Somewhat confusingly for a product called Managed Service for Apache Flink, we’re now going to deploy the application as a “Kinesis Analytics” application—harking back to the previous name of the product.

CleanShot 2024-06-24 at 10.38.59

CleanShot 2024-06-24 at 10.40.24

This isn’t a one-click deploy, but takes me through to the “Create streaming application” page of MSF with pre-populated fields.

CleanShot 2024-06-24 at 10.41.31

The next step is to click Create streaming application, which deploys the Python app that was built from our notebook.

CleanShot 2024-06-24 at 10.42.28

Finally, we can Run our application:

CleanShot 2024-06-24 at 10.45.05

At this stage, in theory, we should see the records from our Kafka topic come rolling into the S3 buckets, filtered for only customers whose name begins with <span class="inline-code">b</span>…

The application is running:

CleanShot 2024-06-24 at 10.47.50

But on the Flink dashboard that’s available with MSF applications, I don’t see any jobs:

CleanShot 2024-06-24 at 10.49.37

Looking closely at the MSF application dashboard I can see under Monitoring the Logs tab, so click on that—and there are some errors.

CleanShot 2024-06-24 at 10.50.43

It looks like <span class="inline-code">Run python process failed</span> is the root error, and unfortunately there’s no detail as to why, other than<span class="inline-code">java.lang.RuntimeException: Python process exits with code: 1</span>.

At this point I YOLO’d out—getting some Flink SQL to work in the notebook was trouble enough, and deploying it as an application will need to be a job for another day…if ever.

The final and most important step in my MSF exploration was to stop the streaming application and the notebook, since both cost money (even if they are not processing data).

CleanShot 2024-06-17 at 12.25.59

Phew…so that was Amazon MSF (neé Kinesis Analytics). What does it look like getting started with Flink SQL on Decodable?

Running Flink SQL on Decodable

Decodable is all about doing things the easy—but powerful—way. Not because you’re lazy; but because you’re smart. Where’s the business differentiation to be had in who can write more boilerplate Flink SQL? So things like connectors don’t even need SQL, they’re just built into the product as first class resources.

Reading data from Kafka with Decodable

From the home page of the product I’m going to connect to Kafka, and I’ll do it by clicking on the rather obvious “Connect to a source”:

CleanShot 2024-06-18 at 16.02.19

From the list of source connectors I click on the Kafka connector…

CleanShot 2024-06-18 at 16.05.15 1

and configure the broker and topic details:

Next I need to give Decodable a destination for this data:

On the next screen I need to provide the schema. If I had a proper schema to hand I’d use it; for now I’ll take advantage of the schema inference and just give it a sample record:

CleanShot 2024-06-18 at 16.09.50

From this it picks up the schema automagically–I could further refine this if necessary, but it’s good for now:

CleanShot 2024-06-18 at 16.10.22

Once I’ve created the connection I click Start and shortly after the metrics screen confirms that data is being read successfully from Kafka.

CleanShot 2024-06-18 at 16.19.25

Let’s check that we can indeed see this data by clicking on the stream that’s linked to from the connector page and using the preview feature:

CleanShot 2024-06-18 at 16.20.52

Looking good!

Let’s now look at the next bit of the pipeline; processing the data to create aggregates.

Flink SQL in Decodable

From the stream view of the data, I click on the Outputs and Create a pipeline

CleanShot 2024-06-18 at 16.22.15

This takes me to a SQL editor. I’m using pretty much the same SQL as I did originally with MSF above. The preview is useful to make sure my SQL compiles and returns the expected data:

CleanShot 2024-06-18 at 16.32.31

You may notice the <span class="inline-code">INSERT INTO</span> there too. This creates another stream which holds the continually-computed result of my query. As new records are sent to the Kafka topic, they’re read by Decodable, processed by my SQL pipeline, and the <span class="inline-code">basket_agg</span> stream updated. This is the stream that we’re going to send to Iceberg in the next section.

The final thing to do, since we want to use <span class="inline-code">customerName</span> as the primary key for the results (that is, the one by which the aggregate values are summarised), is use a <span class="inline-code">COALESCE</span> on <span class="inline-code">customerName</span> so that it is by definition a <span class="inline-code">NOT NULL</span> field. The final SQL looks like this:

INSERT INTO basket_agg 
SELECT COALESCE(customerName,'unknown') AS customerName,
    COUNT(*) AS unique_items,
    SUM(quantity) AS unit_ct
FROM basket01
    CROSS JOIN UNNEST(products) AS t(productName, quantity, unitPrice, category)
GROUP BY customerName

With the SQL created and tested, on the next screen we define the stream and set its primary key explicitly:

CleanShot 2024-06-24 at 12.42.48

Finally, I’ll save the pipeline:

CleanShot 2024-06-24 at 12.44.56

and then start it. Depending on the amount of data being processed and the complexity of the pipeline calculations, I could adjust the number and size of the tasks used for processing, but the defaults are fine for now:

CleanShot 2024-06-24 at 12.45.51

Now I can see data coming in from my Kafka source (<span class="inline-code">basket01</span>) and out to an aggregate stream (<span class="inline-code">basket_agg</span>):

CleanShot 2024-06-24 at 12.51.32

Writing to Apache Iceberg from Decodable

The final part here is to wire up our new stream, <span class="inline-code">basket_agg</span>, to the Apache Iceberg sink. From the stream we can directly add a sink connection:

CleanShot 2024-06-24 at 14.36.34
CleanShot 2024-06-24 at 14.37.02

Then it’s just a matter of entering the type of catalog (AWS Glue), table details, and an IAM role with the necessary permissions:

CleanShot 2024-06-24 at 14.40.52

Finally, save the connection

CleanShot 2024-06-24 at 14.41.56

and start it. With that, we’re up and running!

CleanShot 2024-06-24 at 15.14.09
aws s3 ls --recursive s3://rmoff/rmoff.db/basket_agg_01/
2024-06-24 15:13:31     752343 rmoff.db/basket_agg_01/data/00000-0-be4c1e53-17c1-429c-a4c9-7680f92a2eac-00001.parquet
2024-06-24 15:13:32     206222 rmoff.db/basket_agg_01/data/00000-0-be4c1e53-17c1-429c-a4c9-7680f92a2eac-00002.parquet
2024-06-24 15:13:30     122327 rmoff.db/basket_agg_01/data/00000-0-be4c1e53-17c1-429c-a4c9-7680f92a2eac-00003.parquet
[…]

For completeness, here’s the Iceberg data queried through Amazon Athena:

CleanShot 2024-06-24 at 15.26.42

Running real-time ETL on Decodable in production

In the same way that I looked at what it means to move an MSF notebook through to deployment as an application, let’s see what that looks like on Decodable. In practice, what we’ve built is an application. There are no further steps needed. We can monitor it and control it through the web interface (or CLI/API if we’d rather).

Decodable has comprehensive declarative resource management built into the CLI. This means that if we want to store the resource definitions in source control we can download them easily. This could be to restore to another environment, or to promote through as part of a CI/CD process.

Conclusion

We’ve seen how two platforms built on Apache Flink offer access to Flink SQL.

On Decodable, the only SQL you need to write is if you want to transform data. The ingest and egress of data is done with connectors that just require configuration through a web UI (or CLI if you’d rather). Everything is built and deployed for you, including any dependencies. You just bring the config and the business logic; we do the rest. Speaking of REST, there’s an API to use too if you’d like :)

On Amazon MSF, SQL is used for building connectors as well as transforming data. To use a connector other than what’s included means making the JAR and dependencies available to MSF. Each time you change a dependency means restarting the notebook, so iteration can be a frustratingly slow process. The version of Flink (1.15) also doesn’t help as several features (such as<span class="inline-code">CREATE TABLE…AS SELECT</span>) aren’t available. Lastly, once you’ve built your pipeline in SQL using the Studio Notebooks, to deploy it as an application means bundling it as a PyFlink ZIP and deploying it—something that for me didn’t work out of the box.

Ready to give it a go for yourself? You can find Amazon MSF here, and sign up for Decodable here.

📫 Email signup 👇

Did you enjoy this issue of Checkpoint Chronicle? Would you like the next edition delivered directly to your email to read from the comfort of your own home?

Simply enter your email address here and we'll send you the next issue as soon as it's published—and nothing else, we promise!

👍 Got it!
Oops! Something went wrong while submitting the form.
Robin Moffatt

Robin is a Principal DevEx Engineer at Decodable. He has been speaking at conferences since 2009 including QCon, Devoxx, Strata, Kafka Summit, and Øredev. You can find many of his talks online and his articles on the Decodable blog as well as his own blog.

Outside of work, Robin enjoys running, drinking good beer, and eating fried breakfasts—although generally not at the same time.

One of the things that I love about SQL is the power that it gives you to work with data in a declarative manner. I want this thing…go do it. How should it do it? Well that’s the problem for the particular engine, not me. As a language with a pedigree of multiple decades and no sign of waning (despite a wobbly patch for some whilst NoSQL figured out they actually wanted to be NewSQL 😉), it’s the lingua franca of data systems.

Which is a long way of saying: if you work with data, SQL is mandatory. There, I said it. And if you’re transforming data as part of an ETL or ELT pipeline, it’s one of the simplest and most powerful ways to do it. So let’s see how we can use it with two different platforms in the market that are both built on Apache Flink but expose their support for SQL (specifically, Flink SQL) in very different ways. The first of these is Amazon’s MSF (Managed Service for Apache Flink), and the second is Decodable.

At this point a disclaimer is perhaps warranted, if not entirely obvious: I work for Decodable, and you’re reading this on a Decodable blog. So I’m going to throw MSF under the bus, obviously? Well, no. I’m not interested in that kind of FUD. Hopefully this blog will simply serve to illustrate the similarities and differences between the two.

Let’s build something!

I’m going to take a look at what it takes to build an example of a common pipeline—getting data from a Kafka topic and using SQL to create some aggregate calculations against the data and send it to an S3 bucket in Iceberg format.

I’ll start by exploring Amazon MSF and what it takes to build out the pipeline from the point of view of a data engineer who is deeply familiar with SQL—but not Java or Python (hence this blog - Flink SQL!). After that I’ll look at doing the same thing on Decodable and what the differences are.

Running SQL on Amazon MSF

After logging into the AWS Console and going to the MSF page, I have two options for getting started, <span class="inline-code">Streaming applications</span> or <span class="inline-code">Studio notebooks</span>.

CleanShot 2024-06-17 at 11.13.20

Having not used MSF before and following the logic that since I’m building a pipeline rather than just exploring data, I initially went down the route of creating a <span class="inline-code">Streaming application</span>. This turned out to be a dead-end, since it’s only if you’re writing Java or Python. Whilst you can wrap Flink SQL within these, it’s hardly the pleasant developer experience I was hoping for. I subsequently made my way over to <span class="inline-code">Studio notebooks</span>. This gives you what it says on the tin—a notebook. Beloved of data scientists and analysts, notebooks are brilliant for noodling around with bits of data, creating reproducible examples to share with colleagues, etc.

Jupyter and Apache Zeppelin are the two primary notebook implementations used, and here Amazon has gone with a pretty vanilla deployment of Zeppelin. This is what you get after creating the notebook and starting it up:

CleanShot 2024-06-17 at 12.04.25

This is almost too vanilla for my liking; as someone coming to MSF with the hope of doing some Flink SQL, I’m pretty much left to my own devices to figure out what to do next. I clicked on  <span class="inline-code">Create new note</span> and ended up with this:

CleanShot 2024-06-18 at 11.21.58@2x

’k cool. Now what?

kermit-the-frog-looking-for-directions-gif

(Ironically, if you were to run the latest version of Zeppelin yourself you’d find that it ships with a whole folder of Flink examples—which would have been rather useful here.)

Anyway, back on Amazon MSF I went back a screen and opened the  <span class="inline-code">Examples</span> notebook. This at least made more sense than a blank page. 🙂

CleanShot 2024-06-17 at 12.07.57

The  <span class="inline-code">%flink.ssql</span> in each cell denotes the interpreter that’s used to execute the code in that cell—there are several Flink interpreters available for Apache Zeppelin.

So let’s actually give it a go. As mentioned above, the pipeline we’re building is to stream data from a Kafka topic into Apache Iceberg on S3, calculating some aggregations on the way.

Reading data from Kafka with Amazon MSF

Having explored Flink SQL for myself over the past few months I knew to go and look up the Kafka connector for Flink. Connectors in Flink SQL are defined as tables, from which one can read and write data to the associated system. The  <span class="inline-code">Examples</span> notebook also gives, well, examples to follow too. To start with I wanted to just check connectivity, so I deleted out what was there in the cell and created a single-column table connecting to my cluster:

CleanShot 2024-06-18 at 11.47.27 2

Hmmm, no good.

<console>:5: error: unclosed character literal (or use " for string literal "connector")
          'connector' = 'kafka',
                    ^
<console>:5: error: unclosed character literal (or use " for string literal "kafka")
          'connector' = 'kafka',

I mean, it looks OK to me. After some circling around, I realised that I’d not only deleted the existing example code, but the interpreter string too. With the magic <span class="inline-code">%flink.ssql</span> restored, my table was successfully created:

CleanShot 2024-06-18 at 11.47.58

This is where the nice bit of notebooks comes in—I can build up this example step by step, and actually share the notebook at the end with you, dear reader, and you’ll be able to run it too. I can also add comments and narration, within the notebook itself.

CleanShot 2024-06-18 at 12.32.17
INSERT INTO msf_kafka_test_01 VALUES ('hello world') 

Now let’s run the <span class="inline-code">INSERT</span> shown here, which if all has gone well should result in a message in JSON format (as specified by <span class="inline-code">format</span> in the table definition) in my <span class="inline-code">test_from_msf</span> topic:

# Here's the topic
kcat -b broker:29092 -L
Metadata for all topics (from broker 1: broker:29092/1):
 1 brokers:
  broker 1 at broker:29092 (controller)
 1 topics:
  topic "test_from_msf" with 1 partitions:
    partition 0, leader 1, replicas: 1, isrs: 1

# Here's the message
kcat -b broker:29092 -C -t test_from_msf
{"column1":"hello world"}
% Reached end of topic test_from_msf [0] at offset 1

Looks good! Let’s bring in the actual Kafka topic that we want to read from. The data is simulated transactions from a chain of supermarkets and looks like this:

{
    "customerId": "dc91f8a7-c310-8656-f3ef-f6f0a7af7e58",
    "customerName": "Laure Treutel",
    "customerAddress": "Suite 547 635 Brice Radial, New Jantown, NH 34355",
    "storeId": "401fdb6d-ecc2-75a5-8d0d-efdc64e8637b",
    "storeName": "Swaniawski Group",
    "storeLocation": "Taylorton",
    "products": [
        {
            "productName": "Mediocre Bronze Shoes",
            "quantity": 4.90896716771463,
            "unitPrice": 2.21,
            "category": "Garden, Industrial & Shoes"
        },
        {
            "productName": "Aerodynamic Iron Table",
            "quantity": 2.5936751424049076,
            "unitPrice": 15.79,
            "category": "Music"
        },
        […]
    ],
    "timestamp": "2024-06-16T16:11:21.695+0000"
}

Our first task is to come up with the DDL to declare this table’s schema. You can do this by hand, or trust to the LLM gods—I found Claude 3 Opus gave me what seemed like a pretty accurate answer:

CleanShot 2024-06-18 at 12.52.00

The only thing I needed to change was quoting the reserved word <span class="inline-code">timestamp</span>  being used as a field name. Using this I started a new notebook that would be for my actual pipeline, and began with creating the Kafka source:

CleanShot 2024-06-18 at 12.56.07

With the table created, let’s run a <span class="inline-code">SELECT</span> to sample the data to make sure things are working:

CleanShot 2024-06-18 at 14.19.18

Instead of fighting with type conversions at this stage (that fun can wait!) I’m just going to make this a <span class="inline-code">STRING</span> field and worry about it later. Unfortunately, MSF has other ideas:

CleanShot 2024-06-18 at 14.40.17

After some digging it turns out that <span class="inline-code">ALTER TABLE…MODIFY</span> was added in Flink 1.17 and MSF is running Flink 1.15.

Since we’re not using the table anywhere yet, let’s just drop it and recreate it.

CleanShot 2024-06-18 at 14.53.22

This is good, and bad. Mostly bad. The good bit is that it reminds me that when I created the notebook I selected an existing Glue catalog, and it looks like MSF is seamlessly using it. The bad? I need to go IAM-diving to figure out what the permission problem is here.

Or…sidestep the issue and just create the table with a different name 😁

CleanShot 2024-06-18 at 15.01.24

OK! Now we are getting somewhere. With data flowing in from our Kafka topic let’s build the second half of our pipeline, in which we’re going to compute aggregates on the data before sending it to Apache Iceberg.

Running Flink SQL to transform data on Amazon MSF

Whilst configuring the connection to Kafka might have been a bit more fiddly than we’d have liked, building actual transformational SQL in the Zeppelin notebook is precisely its sweet spot. You can iterate to your heart’s content to get the syntax right and dataset exactly as you want it. You can also take advantage of Zeppelin’s built-in visualisation capabilities to explore the data further:

CleanShot 2024-06-18 at 16.44.41

With the data exploration and noodling done, we’ll wrap our aggregate query as a <span class="inline-code">VIEW</span> to use later when we send it to Iceberg.

CREATE VIEW basket_agg AS 
SELECT customerName,
    COUNT(*) AS unique_items,
    SUM(quantity) AS unit_ct
FROM basket01
    CROSS JOIN UNNEST(products) AS t(productName, quantity, unitPrice, category)
GROUP BY customerName;

Here I’m using <span class="inline-code">UNNEST</span> to get at the individual records in the <span class="inline-code">products</span> array, and create a result set of the number of basket items (<span class="inline-code">COUNT(*)</span>) and total units (<span class="inline-code">SUM(quantity)</span>) broken down by customer (<span class="inline-code">customerName</span>).

Attempting to write to Apache Iceberg from Amazon MSF…

Just as we used a table to read data from Kafka above, we’ll use a table to write to Iceberg on S3.

If I were running this on my own Apache Flink cluster, I’d do this:

Flink SQL> CREATE TABLE basket_iceberg
           WITH (
           'connector' = 'iceberg',
           'catalog-type'='hive',
           'catalog-name'='dev',
           'warehouse' = 's3a://warehouse',
           'hive-conf-dir' = './conf') AS
           SELECT * FROM basket_agg; 

This is based on using Hive as a catalog metastore, whilst here I’m using Glue. I figured I’d slightly guess at the syntax of the command, assuming that I can randomly jiggle things from the resulting error to get them to work.

But I didn’t even get that far. MSF doesn’t seem to be happy with my<span class="inline-code">CREATE TABLE…AS SELECT</span>.

CleanShot 2024-06-19 at 16.00.55

As with <span class="inline-code">ALTER TABLE…MODIFY</span> above, CREATE TABLE…AS SELECT (CTAS) wasn’t added to Flink until 1.16, and MSF is on 1.15. Instead, we’re going to have to split it into two stages - create the table, and then populate it.

Unlike CTAS in which the schema is inherited from the<span class="inline-code">SELECT</span> , here we’re going to have to define the schema manually. We can get the schema from a <span class="inline-code">DESCRIBE</span> against the view:

CleanShot 2024-06-19 at 16.03.06

Copying this schema into our <span class="inline-code">CREATE TABLE</span> gives us this:

CleanShot 2024-06-19 at 16.04.18

So now to populate it…maybe.

CleanShot 2024-06-19 at 16.05.20

Ahhhh Java stack traces my old friend. How I’ve missed you.

not

The one we’ve got here boils down to this:

org.apache.flink.table.api.ValidationException: 
Could not find any factory for identifier 'iceberg' that implements
'org.apache.flink.table.factories.DynamicTableFactory' in the classpath. 

I’ve encountered this error previously. It means that the Flink installation only has a certain set of connectors installed, and not the one we want. The error message actually tells us which ones are available:

Available factory identifiers are:

blackhole
datagen
filesystem
kafka
kinesis
print
upsert-kafka

So how do we write to Iceberg? We need to install the Iceberg connector, obviously! The MSF docs explain how to do this. In essence, I need to put the Iceberg JAR in an S3 bucket, and restart the notebook with the dependency added.

Let’s grab the Iceberg JAR for Flink 1.15 (wanna know more about navigating JAR-hell? You’ll enjoy this blog that I wrote) and upload it to S3:

curl https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-flink-runtime-1.15/1.4.3/iceberg-flink-runtime-1.15-1.4.3.jar -O
aws s3 cp iceberg-flink-runtime-1.15-1.4.3.jar s3://rmoff/iceberg-flink-runtime-1.15-1.4.3.jar

Now to add it as a custom connector:

CleanShot 2024-06-19 at 17.01.09

Unfortunately, MSF wasn’t happy with this. When I click on Create Studio notebook it fails after a minute or two with this:

CleanShot 2024-06-19 at 17.04.49
Please check the role provided or validity of S3 location you provided. We are unable to get the specified fileKey: s3://rmoff/iceberg-flink-runtime-1.15-1.4.3.jar in the specified bucket: rmoff

Ho hum. Since it says “please check the role” I went back a couple of screens to review the notebook’s configuration for IAM, which shows this:

CleanShot 2024-06-19 at 17.10.32

The IAM role is being created automagically, so things ought to Just Work? Right?

Getting started with Flink SQL on Amazon MSF vs Decodable1

So down the rabbit hole we go to the IAM policy editor. Looking up the permissions attached to the policy that MSF created shows, after a bit of scratching around, the problem:

CleanShot 2024-06-19 at 17.17.56

The problem wasn’t actually the IAM that MSF provisioned, but the information with which it did so.

When I specified the Path to S3 Object I gave the fully qualified S3 URL,<span class="inline-code">s3://rmoff/iceberg-flink-runtime-1.15-1.4.3.jar</span> . What MSF wanted was the path within the S3 bucket that I’d also specified (<span class="inline-code">s3://rmoff</span> ). What this ended up with was MSF concatenating the two and configuring IAM for a JAR file at<span class="inline-code">s3://rmoff/s3://rmoff/iceberg-flink-runtime-1.15-1.4.3.jar</span>. This is also confirmed if I go back to the connector configuration screen and look closely at the S3 URI shown as a result of my erroneous input:

CleanShot 2024-06-19 at 17.23.17

So, let’s remove that connector and re-add it with the correct path this time:

CleanShot 2024-06-19 at 17.24.13

I can also click on the hyperlink to confirm that the object is there:

CleanShot 2024-06-19 at 17.24.34

Having some guard rails provided by input validation would have been a nice touch here, but it wasn’t to be.

So. Back to starting the notebook up again.

CleanShot 2024-06-19 at 17.25.34

Woohoo! Let’s go open that notebook and get back to where we were at before this irritating detour, and now hopefully with Iceberg superpowers 🦸. Because we’ve got the Glue catalog storing our table definitions, we should be able to pick up where we left off. This time we get a different error when we try to write to the Iceberg sink—progress!

CleanShot 2024-06-19 at 17.56.36
Caused by: org.apache.flink.table.api.ValidationException: 
Unable to create a sink for writing table 'hive.rmoff.basket_iceberg_00'.

[…]
Caused by: java.lang.NullPointerException: 
Table property 'catalog-name' cannot be null

To be honest, I wasn’t expecting creating an Iceberg connection to be quite as simple as what I tried:

CREATE TABLE […]
    WITH ('connector' = 'iceberg') 

So now onto the fun and fame of working out the correct parameters. The complication here—and the reason I can’t just copy and paste like any good developer reading StackOverflow—is that on MSF the catalog is provided by Glue, not Hive. The MSF documentation on connectors outside Kinesis and Kafka is pretty sparse, and there’s nothing for Iceberg that I could find. The documentation for Apache Iceberg’s Flink connector lists the properties required, and after a bit of poking around there I found this section about using it with a Glue catalog. There’s also this section on using Iceberg from Amazon EMR—which is not the same as MSF, but perhaps has the same pattern of access for Glue and S3 when writing Iceberg data. One point to note here is that the EMR document talks about creating Flink <span class="inline-code">CATALOG</span> objects—I’d recommend you review my primer on catalogs in Flink too if you haven’t already, since it’s an area full of ambiguity and overloaded terms.

Distilling down the above resources, I tried this configuration for the table next:

CREATE TABLE basket_iceberg_01 (
            customerName STRING,
            unique_items BIGINT,
            unit_ct DOUBLE)
       WITH ('connector'   = 'iceberg',
             'catalog-type'= 'glue',
             'catalog-name'= 'rmoff',
             'warehouse'   = 's3://rmoff/msf-test',
             'io-impl'     = 'org.apache.iceberg.aws.s3.S3FileIO') 

Here <span class="inline-code">catalog-name</span> matches my Glue catalog name. When I try to insert into it, I get a different error: <span class="inline-code">java.lang.ClassNotFoundException: org.apache.hadoop.hdfs.HdfsConfiguration</span>.

I mean, we’re only this far in and haven’t had a <span class="inline-code">ClassNotFoundException</span> yet…I think we’re doing pretty well 🤷? My impression of MSF is feeling like the trials and tribulations of learning Apache Flink running locally, but with one arm tied behind my back as I try to debug a system running a version of Flink that’s two years old and with no filesystem access to check things like JARs.

Back to our story. I spent a bunch of time learning about JARs in Flink so was happy taking on this error. <span class="inline-code">ClassNotFoundException</span> means that Flink can’t find a Java class that it needs. It’s usually solved by providing a JAR that includes the class and/or sorting out things like classpaths so that the JAR can be found. Searching a local Hadoop installation (remember those?) turned up this:

Found in: /Users/rmoff/hadoop/hadoop-3.3.4/share/hadoop/hdfs/hadoop-hdfs-client-3.3.4.jar

But now I’m a bit puzzled. We’re hoping to write to S3, so why does it need an HDFS dependency?

On a slight tangent, and taking a look at the table DDL I’m running, it’s using the Flink Iceberg connector as described here to directly create a table, sidestepping the explicit Iceberg catalog creation usually seen in examples. Let’s try creating a catalog as seen here:

CREATE CATALOG c_iceberg WITH (
   'type'='iceberg',
   'warehouse'='s3://rmoff/msf-test',
   'catalog-impl'='org.apache.iceberg.aws.glue.GlueCatalog',
   'io-impl'='org.apache.iceberg.aws.s3.S3FileIO'
 ); 

But, same error:<span class="inline-code">java.lang.ClassNotFoundException: org.apache.hadoop.hdfs.HdfsConfiguration</span> .

Looking at the Iceberg docs again it seems I’ve perhaps missed a dependency, so back to creating a new notebook, this time with <span class="inline-code">hadoop-aws</span> and <span class="inline-code">iceberg-aws-bundle</span> added. I’d wanted to include <span class="inline-code">aws-java-sdk-bundle</span> since I’ve needed that when running things locally, but I guess that one wasn’t to be:

CleanShot 2024-06-19 at 22.00.46@2x
Total combined size for DEPENDENCY_JAR CustomArtifactConfiguration exceeded, 
maximum: 314572800, given: 524262131

So this is what I’ve got for this notebook’s attempt:

CleanShot 2024-06-19 at 22.06.05@2x

Still no dice—same error:<span class="inline-code">java.lang.ClassNotFoundException: org.apache.hadoop.hdfs.HdfsConfiguration</span>. Let’s take the bull by the horns and add the specific JAR file that contains this (<span class="inline-code">hadoop-hdfs-client</span>):

CleanShot 2024-06-19 at 22.25.41@2x

After restarting the notebook I can see the JARs picked up in Zeppelin’s <span class="inline-code">flink.execution.jars</span> :

CleanShot 2024-06-19 at 22.30.01@2x

But… still… when I try to create an Iceberg catalog as above, the… same… error…<span class="inline-code">java.lang.ClassNotFoundException: org.apache.hadoop.hdfs.HdfsConfiguration</span>.

ahhh

So what am I going to do? What any sensible data engineer given half the chance would at this point…give up and try something different. Instead of writing to Iceberg on S3, we’ll compromise and simply write parquet files to S3. This should hopefully be easier since the Flink filesystem connector is one of the connectors that is included with MSF.

Attempting to write Parquet files to S3 from Amazon MSF…

This should be much simpler:

CREATE TABLE s3_sink  (
            customerName STRING,
            unique_items BIGINT,
            unit_ct DOUBLE) 
            WITH ('connector'='filesystem', 
                  'path' = 's3://rmoff/msf-test-parquet',
                  'format' = 'parquet');

INSERT INTO s3_sink 
SELECT * FROM basket_agg; 

But simple is boring, right?

org.apache.flink.table.api.ValidationException: 
Could not find any format factory for identifier 'parquet' in the classpath. 

Up and over to the dependencies again, since we need to add the necessary JAR for parquet format.

CleanShot 2024-06-19 at 22.54.43@2x

But can you spot what might be the problem?

CleanShot 2024-06-19 at 22.54.18@2x

I’d picked up the JAR for Flink 1.18.1—and MSF is running Flink 1.15. I don’t know for certain that the version difference is the problem, but from experience aligning versions is one of the first things to do when troubleshooting.

So, stop the notebook, update the JAR (it’s called “Custom Connectors” in the UI, but it seems to encompass any dependencies), start the notebook… and progress (of sorts):

CleanShot 2024-06-19 at 23.05.28@2x

Of course…we’ve created an aggregate, and we’re trying to write it to a file type which doesn’t support updates. Parquet is append-only, so as the aggregate values change for a given key, how can we represent that in the file?

So that we’ve not completely failed to build an end-to-end pipeline I’m going to shift the goalposts and change the aggregate to a filter, selecting all customers from the Kafka source whose names begin with the letter “b”:

CREATE VIEW customers_b AS 
SELECT customerId, customerName, customerAddress
FROM basket01
WHERE LOWER(customerName) LIKE 'b%'; 

Now let’s write the filtered data to Parquet on S3:

CREATE TABLE s3_customers_b (
     customerId STRING,
     customerName STRING,
     customerAddress STRING
) WITH (
     'connector' = 'filesystem',
     'path'      = 's3://rmoff/msf-test-parquet',
     'format'    = 'parquet'
);

INSERT INTO s3_customers_b SELECT * FROM customers_b; 

This works! For ten glorious seconds! And then…

CleanShot 2024-06-19 at 23.14.01@2x

A couple of StackOverflow answers suggest that even with the Parquet JAR I still need some Hadoop dependencies. So back to the familiar pattern:

  1. Close the notebook
  2. Force stop the notebook (it’s quicker, and I don’t care about data at this point)
  3. Wait for the force stop to complete
  4. Add hadoop-common-3.3.4.jar to the list of Custom connectors for the notebook
  5. Wait for the notebook to update
  6. ‘Run’ (start) the notebook
  7. Wait for the start to complete
  8. Open the notebook
  9. Rerun the failing statement

Unfortunately I’m now just off down the same whack-a-mole route as I was with Iceberg, this time with<span class="inline-code">java.lang.ClassNotFoundException: com.ctc.wstx.io.InputBootstrapper</span> :

CleanShot 2024-06-20 at 10.02.28

Even after adding <span class="inline-code">hadoop-client-runtime-3.3.4.jar</span>, I get the same error. So now I’m wondering about the versions. What version of Hadoop dependencies is Flink 1.15 going to want? I failed to find anything in the Flink docs about versions, so instead went to Maven Repository and picked a Hadoop version based on the date that was fairly close to the release of Flink 1.15 (May 2022)—3.2.2.

This was the last roll of the dice…

CleanShot 2024-06-20 at 10.51.15

Still no dice.

CleanShot 2024-06-20 at 11.06.12

Writing CSV or JSON files to S3 from Amazon MSF

So now I’m going to fall back from Iceberg…to Parquet…to CSV…something…anything…

Here’s the CSV table and<span class="inline-code">INSERT</span>:

CleanShot 2024-06-20 at 11.12.23

But hey—a different error 😃 And this one looks less JAR-related and probably more IAM.

java.nio.file.AccessDeniedException: msf-test-csv/part-3b4dd54f-e4f5-452f-9884-9d131908302c-0-0: initiate MultiPartUpload on msf-test-csv/part-3b4dd54f-e4f5-452f-9884-9d131908302c-0-0: 
com.amazonaws.services.s3.model.AmazonS3Exception: Access Denied (Service: Amazon S3; Status Code: 403; Error Code: AccessDenied; Request ID: 5209TZF3D521ZQJP; S3 Extended Request ID: QIQm89EsUXNYlHVKUHApjZjY1abc4RjK65rvLUW2BdtOC62AGgV99XjMwdlBNVJVVrOIHIGMoBaLqzh/038gyA==; Proxy: null) 

This actually seems fair enough; looking at the IAM details from the AWS console, the IAM role that MSF automagically configures only has access on S3 to the three JARs that I uploaded when I was trying to get parquet to work:

{
    "Sid": "ReadCustomArtifact",
    "Effect": "Allow",
    "Action": [
        "s3:GetObject",
        "s3:GetObjectVersion"
    ],
    "Resource": [
        "arn:aws:s3:::rmoff/hadoop-common-3.2.2.jar",
        "arn:aws:s3:::rmoff/hadoop-client-runtime-3.2.2.jar",
        "arn:aws:s3:::rmoff/flink-sql-parquet-1.15.0.jar"
    ]
},

I added a second policy just to try to get things to work for now. It’s very permissive, and overlaps with the <span class="inline-code">ReadCustomArtifact</span> one (which arguably is redundant anyway if we don’t need any of the parquet and dependency JARs):

{
    "Sid": "ReadAndWriteData",
    "Effect": "Allow",
    "Action": [
        "s3:*"
    ],
    "Resource": [
        "arn:aws:s3:::rmoff/*"
    ]
}

Back to the notebook and things are finally looking hopeful:

CleanShot 2024-06-20 at 11.29.58

Two minutes and no error yet! But also…no data yet:

$ aws s3 ls s3://rmoff/msf-test-csv

$

Twiddling my thumbs and clicking around a bit I notice this little FLINK JOB icon appeared a short while after running the statement:

CleanShot 2024-06-20 at 12.03.19

Clicking on it takes me to the vanilla Flink web dashboard:

CleanShot 2024-06-20 at 12.04.35

CleanShot 2024-06-20 at 12.16.56

This is nice, but doesn’t help me with where my CSV files are. A bit more digging around leads me to the FileSystem documentation and specifically the section about when the part files are written. The <span class="inline-code">sink.rolling-policy.rollover-interval</span> defaults to 30 minutes. Sure enough, after 30 minutes (ish) something does happen:

CleanShot 2024-06-20 at 12.21.28
java.lang.UnsupportedOperationException: 
S3RecoverableFsDataOutputStream cannot sync state to S3. 
Use persist() to create a persistent recoverable intermediate point.

Funnily enough, I’ve hit a very similar error previously, which then leads me to FLINK-28513. Unfortunately this is a bug from Flink 1.15 which has been fixed in 1.17 and later…but MSF is on 1.15, so we’re stuck with it.

Continuing our descent into oblivion of finding a file format—any file format!—to successfully write to S3, let’s try JSON:

CREATE TABLE s3_customers_b_json_01 (
     customerId      STRING,
     customerName    STRING,
     customerAddress STRING
) WITH (
     'connector' = 'filesystem',
     'path'      = 's3://rmoff/msf-test/customers_b-json',
     'format'    = 'json',
     'sink.rolling-policy.rollover-interval' = '30sec'
); 

I’ve also overridden the default <span class="inline-code">sink.rolling-policy.rollover-interval</span> to 30 seconds. If I understand the documentation correctly, then every 30 seconds Flink should start a new file to write to on disk.

Let’s try it!

INSERT INTO s3_customers_b_json_01
    SELECT * FROM customers_b; 

After a break for tea, I came back to find the job still running (yay!)

CleanShot 2024-06-20 at 15.38.36

but no data on S3 (boo!)

$ aws s3 ls --recursive s3://rmoff/msf-test

$

The final piece of the puzzle that I’ve been missing is checkpointing. By default this is disabled, which can be seen here:

CleanShot 2024-06-20 at 15.41.17

I can’t see a way to force a Flink SQL job to checkpoint, so canceled the <span class="inline-code">INSERT</span> and re-ran it with checkpointing enabled:

SET 'execution.checkpointing.interval' = '10sec';

INSERT INTO s3_customers_b_json_01
    SELECT * FROM customers_b; 

Ten seconds is pretty aggressive for checkpointing, but then I am pretty impatient when it comes to testing that something is working as I hope. And the good news is that eventually, mercifully, it has worked.

aws s3 ls --recursive s3://rmoff/msf-test
2024-06-20 15:53:07      90187 msf-test/customers_b-json/part-09af660f-a216-4282-9d7c-fc0ad7225749-0-0
2024-06-20 15:54:05      92170 msf-test/customers_b-json/part-09af660f-a216-4282-9d7c-fc0ad7225749-0-1

How frequently you set your checkpointing and file rolling policy depends on how many smaller files you want, and how soon you want to be able to read the data that’s being ingested. Ten seconds is almost certainly what you do not want—if you need the data that soon then you should be reading it from the Kafka topic directly.

📓 You can find the source for the notebook here.

Wrapping things up on MSF

Well, what a journey that was. We eventually got something working to prove out an end-to-end SQL pipeline. We had to make a bunch of compromises—including ditching the original requirement to aggregate the data—which in real life would be a lot more difficult to stomach, or would have more complicated workarounds. But we’ve got a notebook that defines a streaming pipeline. Yay us! The final step with something like this would be to run it as an application. Notebooks are great for exploration and experimentation, but now we have something that works and we don’t want to have to launch Zeppelin each time to run it.

Running an MSF notebook as an application

MSF has an option to deploy a notebook as a streaming application, although the list of caveats is somewhat daunting. The one which I’m stumbling on is this one:

we only support submitting a single job to Flink.

Looking at the Flink Dashboard I can see two jobs; the Kafka source, and the S3 sink:

CleanShot 2024-06-20 at 17.19.06 1

Let’s give it a try anyway. After stopping the notebook from the console, there’s the option under Configuration for Deploy as application configuration which after putting my bucket name in looks like this:

CleanShot 2024-06-20 at 17.28.27

Now after clicking Run on the notebook it starts up again, and I figure it’s deployed it as an application? Maybe? But there’s nothing in the S3 bucket, and the notebook seems to just open as normal, so I’m a bit puzzled as to what’s going on.

It turns out that what’s not mentioned on the docs page itself—but is in a related tutorial—is that there are more steps to follow. First, build the application from the notebook:

CleanShot 2024-06-24 at 10.07.13

CleanShot 2024-06-24 at 10.07.55

…but at this point it already has a problem.

CleanShot 2024-06-24 at 10.08.44
Kafka→⚙️→S3 (JSON) has failed to build and export to Amazon S3: 
Statement is unsupported for deploying as part of a note. 
Only statements starting with INSERT, UPDATE, MERGE, DELETE, 
USE, CREATE, ALTER, DROP, SET or RESET are supported. 
SELECT * FROM basket01 LIMIT 5

So, it looks like my notebook—in which I’ve done what one does in a notebook, namely to explore and iterate and evolve a data processing pipeline—is not ok as it is. I need to go back through it and trim it down to remove the interactive bits (the ones that made it useful as a notebook, really).

The Kafka source and S3 sink tables and the view that I’m using to filter the data for the sink are held in the Glue catalog:

CleanShot 2024-06-24 at 10.18.25

So all I need in my notebook now is the actual ‘application’—the <span class="inline-code">INSERT INTO</span> streaming SQL that connects them all together. I’ve stripped it back and saved it as a new note:

CleanShot 2024-06-24 at 10.19.44

Now when I build it MSF is happy:

CleanShot 2024-06-24 at 10.22.11

in my S3 bucket I can see code:

$ aws s3 ls s3://rmoff/rmoff-test-04/zeppelin-code/
2024-06-24 10:22:02  447072452 rmoff-test-04-Kafka____S3__JSON_-2JZ3CVU11-2024-06-24T09:22:01.370764Z.zip

Being of a curious bent, I of course want to know how two SQL statements compile to an application of 400Mb, so download the ZIP file. Within a bunch of Python files is this one, <span class="inline-code">note.py</span>:

# This file was synthesized from a Kinesis Analytics Zeppelin note.

from flink_environment import s_env, st_env, gateway
from pyflink.common import *
from pyflink.datastream import *
from pyflink.table import *
from pyflink.table.catalog import *
from pyflink.table.descriptors import *
from pyflink.table.window import *
from pyflink.table.udf import *
from zeppelin_context import z, __zeppelin__

import pyflink
import errno
import os
import json
import textwrap
import sys


print("Executing %flink.ssql paragraph #1")
st_env.execute_sql(
    textwrap.dedent(
        """
        SET 'execution.checkpointing.interval' = '60sec'
        """
    ).strip()
)
statement_set = st_env.create_statement_set()
statement_set.add_insert_sql(
    textwrap.dedent(
        """
        INSERT INTO s3_customers_b_json_01
            SELECT * FROM customers_b
        """
    ).strip()
)
statement_set.execute()

So with the note built into an application ZIP on S3, we can now deploy it. Somewhat confusingly for a product called Managed Service for Apache Flink, we’re now going to deploy the application as a “Kinesis Analytics” application—harking back to the previous name of the product.

CleanShot 2024-06-24 at 10.38.59

CleanShot 2024-06-24 at 10.40.24

This isn’t a one-click deploy, but takes me through to the “Create streaming application” page of MSF with pre-populated fields.

CleanShot 2024-06-24 at 10.41.31

The next step is to click Create streaming application, which deploys the Python app that was built from our notebook.

CleanShot 2024-06-24 at 10.42.28

Finally, we can Run our application:

CleanShot 2024-06-24 at 10.45.05

At this stage, in theory, we should see the records from our Kafka topic come rolling into the S3 buckets, filtered for only customers whose name begins with <span class="inline-code">b</span>…

The application is running:

CleanShot 2024-06-24 at 10.47.50

But on the Flink dashboard that’s available with MSF applications, I don’t see any jobs:

CleanShot 2024-06-24 at 10.49.37

Looking closely at the MSF application dashboard I can see under Monitoring the Logs tab, so click on that—and there are some errors.

CleanShot 2024-06-24 at 10.50.43

It looks like <span class="inline-code">Run python process failed</span> is the root error, and unfortunately there’s no detail as to why, other than<span class="inline-code">java.lang.RuntimeException: Python process exits with code: 1</span>.

At this point I YOLO’d out—getting some Flink SQL to work in the notebook was trouble enough, and deploying it as an application will need to be a job for another day…if ever.

The final and most important step in my MSF exploration was to stop the streaming application and the notebook, since both cost money (even if they are not processing data).

CleanShot 2024-06-17 at 12.25.59

Phew…so that was Amazon MSF (neé Kinesis Analytics). What does it look like getting started with Flink SQL on Decodable?

Running Flink SQL on Decodable

Decodable is all about doing things the easy—but powerful—way. Not because you’re lazy; but because you’re smart. Where’s the business differentiation to be had in who can write more boilerplate Flink SQL? So things like connectors don’t even need SQL, they’re just built into the product as first class resources.

Reading data from Kafka with Decodable

From the home page of the product I’m going to connect to Kafka, and I’ll do it by clicking on the rather obvious “Connect to a source”:

CleanShot 2024-06-18 at 16.02.19

From the list of source connectors I click on the Kafka connector…

CleanShot 2024-06-18 at 16.05.15 1

and configure the broker and topic details:

Next I need to give Decodable a destination for this data:

On the next screen I need to provide the schema. If I had a proper schema to hand I’d use it; for now I’ll take advantage of the schema inference and just give it a sample record:

CleanShot 2024-06-18 at 16.09.50

From this it picks up the schema automagically–I could further refine this if necessary, but it’s good for now:

CleanShot 2024-06-18 at 16.10.22

Once I’ve created the connection I click Start and shortly after the metrics screen confirms that data is being read successfully from Kafka.

CleanShot 2024-06-18 at 16.19.25

Let’s check that we can indeed see this data by clicking on the stream that’s linked to from the connector page and using the preview feature:

CleanShot 2024-06-18 at 16.20.52

Looking good!

Let’s now look at the next bit of the pipeline; processing the data to create aggregates.

Flink SQL in Decodable

From the stream view of the data, I click on the Outputs and Create a pipeline

CleanShot 2024-06-18 at 16.22.15

This takes me to a SQL editor. I’m using pretty much the same SQL as I did originally with MSF above. The preview is useful to make sure my SQL compiles and returns the expected data:

CleanShot 2024-06-18 at 16.32.31

You may notice the <span class="inline-code">INSERT INTO</span> there too. This creates another stream which holds the continually-computed result of my query. As new records are sent to the Kafka topic, they’re read by Decodable, processed by my SQL pipeline, and the <span class="inline-code">basket_agg</span> stream updated. This is the stream that we’re going to send to Iceberg in the next section.

The final thing to do, since we want to use <span class="inline-code">customerName</span> as the primary key for the results (that is, the one by which the aggregate values are summarised), is use a <span class="inline-code">COALESCE</span> on <span class="inline-code">customerName</span> so that it is by definition a <span class="inline-code">NOT NULL</span> field. The final SQL looks like this:

INSERT INTO basket_agg 
SELECT COALESCE(customerName,'unknown') AS customerName,
    COUNT(*) AS unique_items,
    SUM(quantity) AS unit_ct
FROM basket01
    CROSS JOIN UNNEST(products) AS t(productName, quantity, unitPrice, category)
GROUP BY customerName

With the SQL created and tested, on the next screen we define the stream and set its primary key explicitly:

CleanShot 2024-06-24 at 12.42.48

Finally, I’ll save the pipeline:

CleanShot 2024-06-24 at 12.44.56

and then start it. Depending on the amount of data being processed and the complexity of the pipeline calculations, I could adjust the number and size of the tasks used for processing, but the defaults are fine for now:

CleanShot 2024-06-24 at 12.45.51

Now I can see data coming in from my Kafka source (<span class="inline-code">basket01</span>) and out to an aggregate stream (<span class="inline-code">basket_agg</span>):

CleanShot 2024-06-24 at 12.51.32

Writing to Apache Iceberg from Decodable

The final part here is to wire up our new stream, <span class="inline-code">basket_agg</span>, to the Apache Iceberg sink. From the stream we can directly add a sink connection:

CleanShot 2024-06-24 at 14.36.34
CleanShot 2024-06-24 at 14.37.02

Then it’s just a matter of entering the type of catalog (AWS Glue), table details, and an IAM role with the necessary permissions:

CleanShot 2024-06-24 at 14.40.52

Finally, save the connection

CleanShot 2024-06-24 at 14.41.56

and start it. With that, we’re up and running!

CleanShot 2024-06-24 at 15.14.09
aws s3 ls --recursive s3://rmoff/rmoff.db/basket_agg_01/
2024-06-24 15:13:31     752343 rmoff.db/basket_agg_01/data/00000-0-be4c1e53-17c1-429c-a4c9-7680f92a2eac-00001.parquet
2024-06-24 15:13:32     206222 rmoff.db/basket_agg_01/data/00000-0-be4c1e53-17c1-429c-a4c9-7680f92a2eac-00002.parquet
2024-06-24 15:13:30     122327 rmoff.db/basket_agg_01/data/00000-0-be4c1e53-17c1-429c-a4c9-7680f92a2eac-00003.parquet
[…]

For completeness, here’s the Iceberg data queried through Amazon Athena:

CleanShot 2024-06-24 at 15.26.42

Running real-time ETL on Decodable in production

In the same way that I looked at what it means to move an MSF notebook through to deployment as an application, let’s see what that looks like on Decodable. In practice, what we’ve built is an application. There are no further steps needed. We can monitor it and control it through the web interface (or CLI/API if we’d rather).

Decodable has comprehensive declarative resource management built into the CLI. This means that if we want to store the resource definitions in source control we can download them easily. This could be to restore to another environment, or to promote through as part of a CI/CD process.

Conclusion

We’ve seen how two platforms built on Apache Flink offer access to Flink SQL.

On Decodable, the only SQL you need to write is if you want to transform data. The ingest and egress of data is done with connectors that just require configuration through a web UI (or CLI if you’d rather). Everything is built and deployed for you, including any dependencies. You just bring the config and the business logic; we do the rest. Speaking of REST, there’s an API to use too if you’d like :)

On Amazon MSF, SQL is used for building connectors as well as transforming data. To use a connector other than what’s included means making the JAR and dependencies available to MSF. Each time you change a dependency means restarting the notebook, so iteration can be a frustratingly slow process. The version of Flink (1.15) also doesn’t help as several features (such as<span class="inline-code">CREATE TABLE…AS SELECT</span>) aren’t available. Lastly, once you’ve built your pipeline in SQL using the Studio Notebooks, to deploy it as an application means bundling it as a PyFlink ZIP and deploying it—something that for me didn’t work out of the box.

Ready to give it a go for yourself? You can find Amazon MSF here, and sign up for Decodable here.

📫 Email signup 👇

Did you enjoy this issue of Checkpoint Chronicle? Would you like the next edition delivered directly to your email to read from the comfort of your own home?

Simply enter your email address here and we'll send you the next issue as soon as it's published—and nothing else, we promise!

Robin Moffatt

Robin is a Principal DevEx Engineer at Decodable. He has been speaking at conferences since 2009 including QCon, Devoxx, Strata, Kafka Summit, and Øredev. You can find many of his talks online and his articles on the Decodable blog as well as his own blog.

Outside of work, Robin enjoys running, drinking good beer, and eating fried breakfasts—although generally not at the same time.