Back
February 27, 2024
4
min read

Flink SQL and the Joy of JARs

By
Robin Moffatt
Share this post

I will wager you half of my lottery winnings from 2023<sup>[1]</sup> that you're going to encounter this lovely little error at some point on your Flink SQL journey:

Could not execute SQL statement. Reason: java.lang.ClassNotFoundException

It comes in a variety of flavours, such as this one:

Flink SQL> CREATE CATALOG c_hive WITH (
>        'type' = 'hive',
>        'hive-conf-dir' = './conf/'
>    );
[ERROR] Could not execute SQL statement. Reason:
java.lang.ClassNotFoundException: org.apache.hadoop.fs.FSDataInputStream

In terms of cause, <span class="inline-code">ClassNotFoundException</span> is about as generic and common as errors get. It means that part of the code (a 'class') that the program needs to run can't be found. In practice, it means that you're missing one or more JAR files, or you have them but they're not getting loaded, perhaps being in the wrong location. A JAR file is a “Java ARchive” made up of a set of Java resources such as code and compressed similar to how a ZIP file is.

To fix it you need the right JAR, and that JAR in the right place. Let's look at this in a bit more detail.

Identifying the correct JAR

Perhaps it'll be clear which JAR you need based on what you're trying to do—but often not. As you get more familiar with Flink you'll learn certain patterns of what dependencies there are. These include:

  • Hadoop, including—but not only—if you're using Hive or Hive Metastore. You can go and get the specific JARs one-by-one, or just make the whole Hadoop distribution available to Flink by setting the <span class="inline-code">HADOOP_CLASSPATH</span> as detailed in my earlier blog post and covered in the documentation too.
  • Connectors, which might include those within the Flink project such as for Apache Kafka—but also those outside of it (and what you might perhaps think of as format rather than connector) such as Apache Iceberg.
  • Formats, such as Parquet
  • Filesystems, such as S3

The easy-but-dangerous route is Google. It might shortcut you to the right JAR, but it might also send you off down a rathole of irrelevant links and outdated solutions. Beware finding a solution that looks like a hammer for your nail of a problem if you've actually got a screw!

JAR Naming

Pay very close attention to names and versions of your JAR files. It might look like a bunch of word-and-number salad, but herein can lie the root cause of many a problem.

Pasted image 20240221135544.png

Make sure that you line up the package name with what you actually need. For example:

  • <span class="inline-code">flink-parquet</span> is different from <span class="inline-code">flink-sql-parquet</span>, since the latter includes all its dependencies whilst the former doesn't. This is a standard used in the naming of Flink JARs: if it’s a <span class="inline-code">flink-sql</span> prefix then it includes the dependencies, whilst <span class="inline-code">flink</span>- on its own means that it doesn’t..  That's what the flink- vs flink-sql prefix indicates.
  • <span class="inline-code">1.14.6</span> is a pretty old version of Flink if you're using <span class="inline-code">1.18.1</span> and so a JAR intended for it is likely not to work. In fact, it’s generally a bad idea to mix versions, even if it appears to work.

When you start integrating with other systems you'll also see their versions come into play, like this:

Pasted image 20240221135606.png

Finding JARs

Maven Central Repository is where you'll find most of the JARs you need. It's used widely in the Java (and adjacent communities) for publishing and retrieving open-source libraries. As such, you might find that if you’re coming to it as someone just looking to get Flink SQL to work you might find it a bit confusing (I did) and navigating it takes a little bit of practice.

Let's say we want the necessary JAR to be able to use Parquet with the Flink SQL client. With the ever-so useful hindsight, we'd know that we go to the Flink docs, find the Parquet format page and click on "Download" under SQL Client.

CleanShot 2024-02-06 at 11.37.57@2x.png

(If you do this you'll notice that the JAR linked to is indeed on Maven Central)

Now, that presumes that we knew that we'd find information about Parquet under "Connectors" (…okay?), and then Table API Connectors (…riight?), and then Formats (yep that one makes sense). If we didn't, we'd do what anyone does - Google it. And then Google tells you that you need a flink-sql-parquet JAR, and off you go. Searching for this on Maven Central gives us this:

CleanShot 2024-02-06 at 11.49.49@2x.png

The first thing to check is the <span class="inline-code">Latest version</span> and <span class="inline-code">Published</span>. You want to get your JARs lined up with your Flink distribution, and since I'm using <span class="inline-code">1.18.1</span> I can discount the second two results which show their latest version as <span class="inline-code">1.14.6</span> (with the <span class="inline-code">_2.12</span> and <span class="inline-code">_2.11</span> suffixes).

Clicking on flink-sql-parquet takes us to the overview page, with no download link:

CleanShot 2024-02-06 at 11.52.08@2x.png

If you're anything like me in your web browsing, stop randomly clicking stuff, and just cool your boots at this point and be patient. Don't click away, because we're nearly there.

Click on Versions and then Browse for the 1.18.1 version

CleanShot 2024-02-06 at 11.53.55@2x.png

The last page is a blast from the past—and if we're going to be nostalgic, a refreshing break from the jarring visual junk yards that are web pages today 😉—with just a directory listing.

CleanShot 2024-02-06 at 11.55.26@2x.png

Look very carefully, and you want the .jar file (but not the -sources one)

CleanShot 2024-02-06 at 11.55.26@2x 1.png

Last tip on this: if you have a Maven URL for a JAR and want a different version (or different package), you can just strip back the URL to the parent folders to up the hierarchy and then back down again. For example:

https://repo1.maven.org/maven2/org/apache/flink/flink-sql-parquet/1.18.1
👇
https://repo1.maven.org/maven2/org/apache/flink/flink-sql-parquet/
👇
https://repo1.maven.org/maven2/org/apache/flink/
👇
https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka/
👇
https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka/3.0.2-1.18/

Putting the JAR in the right place

JARs should go under the <span class="inline-code">./lib</span> folder of your Flink installation (or installations) if you're running it as a distributed cluster.

You can create subfolders under <span class="inline-code">./lib</span> if you like to keep things tidy:

lib
├── aws
│   ├── aws-java-sdk-bundle-1.12.648.jar
│   └── hadoop-aws-3.3.4.jar
[…]
├── flink-table-runtime-1.18.1.jar
├── formats
│   └── flink-sql-parquet-1.18.1.jar
├── hadoop
│   ├── commons-configuration2-2.1.1.jar
│   ├── commons-logging-1.1.3.jar
│   ├── hadoop-auth-3.3.4.jar
[…]
│   ├── stax2-api-4.2.1.jar
│   └── woodstox-core-5.3.0.jar
├── hive
│   └── flink-sql-connector-hive-3.1.3_2.12-1.18.1.jar
[…]
└── log4j-slf4j-impl-2.17.1.jar

You'll notice that I've got a <span class="inline-code">hadoop</span> folder with just a small number of JARs in. These are just the bare essentials that I found Flink needed. However, the proper (and more reliable) way that's documented is to set the <span class="inline-code">HADOOP_CLASSPATH</span> pointing to your Hadoop download:

export HADOOP_CLASSPATH=$(/hadoop-3.3.4/bin/hadoop classpath)

The <span class="inline-code">HADOOP_CLASSPATH</span> gets added to the classpath (via the INTERNAL_HADOOP_CLASSPATH variable) with which Flink is launched.

A final note on <span class="inline-code">HADOOP_CLASSPATH</span> is that it's an environment variable local to the session in which you set it. So if you do this:

# terminal session 1
./bin/start-cluster.sh
# terminal session 2
export HADOOP_CLASSPATH=$(/hadoop-3.3.4/bin/hadoop classpath)
./bin/sql-client

You'll find that the SQL Client seems to work just fine (at least, doesn't throw an error):

Flink SQL> INSERT INTO `c_hive`.`db_rmoff`.t_foo VALUES ('a', 42);

But querying doesn't work:

Flink SQL> select * from `c_hive`.`db_rmoff`.t_foo;
[ERROR] Could not execute SQL statement. Reason:
java.lang.ClassNotFoundException: org.apache.hadoop.conf.Configuration

This is because the Flink taskmanager, started as part of <span class="inline-code">./bin/start-cluster.sh</span> as shown above, didn't have <span class="inline-code">HADOOP_CLASSPATH</span> set when it was launched.

Checking where Flink is looking for JARs

You can inspect the classpath that each Flink process is running with through the <span class="inline-code">jinfo</span> command:

$ jinfo $(pgrep -f org.apache.flink.table.client.SqlClient) | grep java_class_path 
java_class_path (initial): /Users/rmoff/flink/flink-1.18.1/lib/aws/aws-java-sdk-bundle-1.12.648.jar:/Users/rmoff/flink/flink-1.18.1/lib/aws/hadoop-aws-3.3.4.jar:/Users/rmoff/flink/flink-1.18.1/lib/flink-cep-1.18.1.jar:/Users/rmoff/flink/flink-1.18.1/lib/flink-connector-files-1.18.1.jar:/Users/rmoff/flink/flink-1.18.1/lib/flink-csv-1.18.1.jar:/Users/rmoff/flink/flink-1.18.1/lib/flink-json-1.18.1.jar:/Users/rmoff/flink/flink-1.18.1/lib/flink-scala_2.12-1.18.1.jar:/Users/rmoff/flink/flink-1.18.1/lib/flink-table-api-java-uber-1.18.1.jar:/Users/rmoff/flink/flink-1.18.1/lib/flink-table-planner-loader-1.18.1.jar:/Users/rmoff/flink/flink-1.18.1/lib/flink-table-runtime-1.18.1.jar:/Users/rmoff/flink/flink-1.18.1/lib/hadoop/commons-configuration2-2.1.1.jar:/Users/rmoff/flink/flink-1.18.1/lib/hadoop/commons-logging-1.1.3.jar:/Users/rmoff/flink/flink-1.18.1/lib/hadoop/hadoop-auth-3.3.4.jar:/Users/rmoff/flink/flink-1.18.1/lib/hadoop/hadoop-common-3.3.4.jar:/Users/rmoff/flink/flink-1.18.1/lib/hadoop/hadoop-hdfs-client-3.3.4.jar:/Users/rmoff/flink/flink-1.18.1/lib/hadoop/hadoop-mapreduce-client-core-3.3.4.jar:/Users/rmoff/flink/flink-1.18.1/lib/hadoop/hadoop-shaded-guava-1.1.1.jar:/Users/rmoff/flink/flink-1.18.1/lib/hadoop/stax2-api-4.2.1.jar:/Users/rmoff/flink/flink-1.18.1/lib/hadoop/woodstox-core-5.3.0.jar:/Users/rmoff/flink/flink-1.18.1/lib/hive/flink-sql-connector-hive-3.1.3_2.12-1.18.1.jar:/Users/rmoff/flink/flink-1.18.1/lib/iceberg/iceberg-aws-bundle-1.4.3.jar:/Users/rmoff/flink/flink-1.18.1/lib/iceberg/iceberg-flink-runtime-1.17-1.4.3.jar:/Users/rmoff/flink/flink-1.18.1/lib/log4j-1.2-api-2.17.1.jar:/Users/rmoff/flink/flink-1.18.1/lib/log4j-api-2.17.1.jar:/Users/rmoff/flink/flink-1.18.1/lib/log4j-core-2.17.1.jar:/Users/rmoff/flink/flink-1.18.1/lib/log4j-slf4j-impl-2.17.1.jar:/Users/rmoff/flink/flink-1.18.1/lib/flink-dist-1.18.1.jar:/Users/rmoff/flink/flink-1.18.1/opt/flink-python-1.18.1.jar:/Users/rmoff/flink/flink-1.18.1/opt/flink-sql-gateway-1.18.1.jar::::/Users/rmoff/fli

With a bit more bash we can display it in a way that's much easier to read, splitting out just the classpath and putting each entry on a new line:

$ jinfo $(pgrep -f org.apache.flink.table.client.SqlClient) | grep java_class_path | tr ':' '\n'

/Users/rmoff/flink/flink-1.18.1/lib/aws/aws-java-sdk-bundle-1.12.648.jar
/Users/rmoff/flink/flink-1.18.1/lib/aws/hadoop-aws-3.3.4.jar
/Users/rmoff/flink/flink-1.18.1/lib/flink-cep-1.18.1.jar
/Users/rmoff/flink/flink-1.18.1/lib/flink-connector-files-1.18.1.jar
[…]

Don't Forget to Restart All the Flink processes

When you add or remove a JAR you need to make sure to restart all the Flink components so that they pick it up. Some interactions the SQL Client does directly, others get submitted to the job manager and you'll only hit an error then.

Particularly if you're frantically dropping JARs in and out to try and fix a dependency issue, make a point of deliberately restarting all the processes each time. Otherwise you find that something 'suddenly' stops working because it's only after numerous changes that you restarted it and now you can't remember what permutation things were in when it did work.

Have you got the right JAR? Looking Inside a JAR 🔎

So you've hit the dreaded <span class="inline-code">ClassNotFoundException</span> error. You know that it's usually down to not having the right JAR loaded by Flink SQL. But what if you think you've got the right JAR and want to confirm it?

Let's take this example:

Flink SQL> CREATE CATALOG c_hive WITH (
>        'type' = 'hive',
>        'hive-conf-dir' = './conf/'
>    );
[ERROR] Could not execute SQL statement. Reason:
java.lang.ClassNotFoundException: org.apache.hadoop.fs.FSDataInputStream

A JAR file is just an archive of code, and using the <span class="inline-code">-t</span> attribute of the <span class="inline-code">jar</span> command we can list its contents. Let's verify if the JAR we think provides this class does indeed do so:

$ jar -tf hadoop-common-3.3.2.jar | grep org.apache.hadoop.fs.FSDataInputStream
org/apache/hadoop/fs/FSDataInputStream.class

Yes it does! We can also search a folder for a given class, with this little bit of Bash:

find ~/hadoop-3.3.4 -name '*.jar' | while read jar; do
    if jar -tf "$jar" | grep -q 'org.apache.hadoop.fs.FSDataInputStream'; then
        echo "Found in: $jar"
    fi
done

So we've checked one possible issue off our list, and can move onto the point below: is the JAR even getting picked up by Flink?

Did the JAR even get loaded?

There's something in Java called classloading, and you can read about how Flink does it here. From an end-user point of view if you're trying to understand what's going on with a JAR file one useful trick is to see if it's even being loaded, or perhaps if it is by what.

If you set <span class="inline-code">JVM_ARGS</span> to <span class="inline-code">-verbose:class</span> when you run the SQL Client you can see what's going on. Couple it with putting your SQL statements in a file that you pass with -f</span> and you can dump and end-to-end process out to a log file to then poke through at your leisure:

JVM_ARGS=-verbose:class ./bin/sql-client.sh -f ../my_sql_statements.sql \
  > jvm_debug_stuff.log

Here is an example of the kind of thing you might see, showing where a particular catalog is being loaded from:

[0.768s][info][class,load] org.apache.iceberg.flink.FlinkCatalog source: 
file:/Users/rmoff/flink/flink-1.18.1/lib/iceberg-flink-runtime-1.17-1.4.3.jar

In this example we can see that in our 1.18 installation of Flink the <span class="inline-code">FlinkCatalog</span> is being loaded from a JAR intended for use with 1.17—which as we discussed above, is often going to lead to problem since versions should be kept in alignment.

Does it really need to be this complicated?

No, not if you use Decodable's fully managed platform. 🙂 We have Flink SQL and do all of the jarring-JAR fiddling for you. Give it a whirl today.

<sup>[1] I didn't play the lottery in 2023 😉</sup>

📫 Email signup 👇

Did you enjoy this issue of Checkpoint Chronicle? Would you like the next edition delivered directly to your email to read from the comfort of your own home?

Simply enter your email address here and we'll send you the next issue as soon as it's published—and nothing else, we promise!

👍 Got it!
Oops! Something went wrong while submitting the form.
Robin Moffatt

Robin is a Principal DevEx Engineer at Decodable. He has been speaking at conferences since 2009 including QCon, Devoxx, Strata, Kafka Summit, and Øredev. You can find many of his talks online and his articles on the Decodable blog as well as his own blog.

Outside of work, Robin enjoys running, drinking good beer, and eating fried breakfasts—although generally not at the same time.

I will wager you half of my lottery winnings from 2023<sup>[1]</sup> that you're going to encounter this lovely little error at some point on your Flink SQL journey:

Could not execute SQL statement. Reason: java.lang.ClassNotFoundException

It comes in a variety of flavours, such as this one:

Flink SQL> CREATE CATALOG c_hive WITH (
>        'type' = 'hive',
>        'hive-conf-dir' = './conf/'
>    );
[ERROR] Could not execute SQL statement. Reason:
java.lang.ClassNotFoundException: org.apache.hadoop.fs.FSDataInputStream

In terms of cause, <span class="inline-code">ClassNotFoundException</span> is about as generic and common as errors get. It means that part of the code (a 'class') that the program needs to run can't be found. In practice, it means that you're missing one or more JAR files, or you have them but they're not getting loaded, perhaps being in the wrong location. A JAR file is a “Java ARchive” made up of a set of Java resources such as code and compressed similar to how a ZIP file is.

To fix it you need the right JAR, and that JAR in the right place. Let's look at this in a bit more detail.

Identifying the correct JAR

Perhaps it'll be clear which JAR you need based on what you're trying to do—but often not. As you get more familiar with Flink you'll learn certain patterns of what dependencies there are. These include:

  • Hadoop, including—but not only—if you're using Hive or Hive Metastore. You can go and get the specific JARs one-by-one, or just make the whole Hadoop distribution available to Flink by setting the <span class="inline-code">HADOOP_CLASSPATH</span> as detailed in my earlier blog post and covered in the documentation too.
  • Connectors, which might include those within the Flink project such as for Apache Kafka—but also those outside of it (and what you might perhaps think of as format rather than connector) such as Apache Iceberg.
  • Formats, such as Parquet
  • Filesystems, such as S3

The easy-but-dangerous route is Google. It might shortcut you to the right JAR, but it might also send you off down a rathole of irrelevant links and outdated solutions. Beware finding a solution that looks like a hammer for your nail of a problem if you've actually got a screw!

JAR Naming

Pay very close attention to names and versions of your JAR files. It might look like a bunch of word-and-number salad, but herein can lie the root cause of many a problem.

Pasted image 20240221135544.png

Make sure that you line up the package name with what you actually need. For example:

  • <span class="inline-code">flink-parquet</span> is different from <span class="inline-code">flink-sql-parquet</span>, since the latter includes all its dependencies whilst the former doesn't. This is a standard used in the naming of Flink JARs: if it’s a <span class="inline-code">flink-sql</span> prefix then it includes the dependencies, whilst <span class="inline-code">flink</span>- on its own means that it doesn’t..  That's what the flink- vs flink-sql prefix indicates.
  • <span class="inline-code">1.14.6</span> is a pretty old version of Flink if you're using <span class="inline-code">1.18.1</span> and so a JAR intended for it is likely not to work. In fact, it’s generally a bad idea to mix versions, even if it appears to work.

When you start integrating with other systems you'll also see their versions come into play, like this:

Pasted image 20240221135606.png

Finding JARs

Maven Central Repository is where you'll find most of the JARs you need. It's used widely in the Java (and adjacent communities) for publishing and retrieving open-source libraries. As such, you might find that if you’re coming to it as someone just looking to get Flink SQL to work you might find it a bit confusing (I did) and navigating it takes a little bit of practice.

Let's say we want the necessary JAR to be able to use Parquet with the Flink SQL client. With the ever-so useful hindsight, we'd know that we go to the Flink docs, find the Parquet format page and click on "Download" under SQL Client.

CleanShot 2024-02-06 at 11.37.57@2x.png

(If you do this you'll notice that the JAR linked to is indeed on Maven Central)

Now, that presumes that we knew that we'd find information about Parquet under "Connectors" (…okay?), and then Table API Connectors (…riight?), and then Formats (yep that one makes sense). If we didn't, we'd do what anyone does - Google it. And then Google tells you that you need a flink-sql-parquet JAR, and off you go. Searching for this on Maven Central gives us this:

CleanShot 2024-02-06 at 11.49.49@2x.png

The first thing to check is the <span class="inline-code">Latest version</span> and <span class="inline-code">Published</span>. You want to get your JARs lined up with your Flink distribution, and since I'm using <span class="inline-code">1.18.1</span> I can discount the second two results which show their latest version as <span class="inline-code">1.14.6</span> (with the <span class="inline-code">_2.12</span> and <span class="inline-code">_2.11</span> suffixes).

Clicking on flink-sql-parquet takes us to the overview page, with no download link:

CleanShot 2024-02-06 at 11.52.08@2x.png

If you're anything like me in your web browsing, stop randomly clicking stuff, and just cool your boots at this point and be patient. Don't click away, because we're nearly there.

Click on Versions and then Browse for the 1.18.1 version

CleanShot 2024-02-06 at 11.53.55@2x.png

The last page is a blast from the past—and if we're going to be nostalgic, a refreshing break from the jarring visual junk yards that are web pages today 😉—with just a directory listing.

CleanShot 2024-02-06 at 11.55.26@2x.png

Look very carefully, and you want the .jar file (but not the -sources one)

CleanShot 2024-02-06 at 11.55.26@2x 1.png

Last tip on this: if you have a Maven URL for a JAR and want a different version (or different package), you can just strip back the URL to the parent folders to up the hierarchy and then back down again. For example:

https://repo1.maven.org/maven2/org/apache/flink/flink-sql-parquet/1.18.1
👇
https://repo1.maven.org/maven2/org/apache/flink/flink-sql-parquet/
👇
https://repo1.maven.org/maven2/org/apache/flink/
👇
https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka/
👇
https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka/3.0.2-1.18/

Putting the JAR in the right place

JARs should go under the <span class="inline-code">./lib</span> folder of your Flink installation (or installations) if you're running it as a distributed cluster.

You can create subfolders under <span class="inline-code">./lib</span> if you like to keep things tidy:

lib
├── aws
│   ├── aws-java-sdk-bundle-1.12.648.jar
│   └── hadoop-aws-3.3.4.jar
[…]
├── flink-table-runtime-1.18.1.jar
├── formats
│   └── flink-sql-parquet-1.18.1.jar
├── hadoop
│   ├── commons-configuration2-2.1.1.jar
│   ├── commons-logging-1.1.3.jar
│   ├── hadoop-auth-3.3.4.jar
[…]
│   ├── stax2-api-4.2.1.jar
│   └── woodstox-core-5.3.0.jar
├── hive
│   └── flink-sql-connector-hive-3.1.3_2.12-1.18.1.jar
[…]
└── log4j-slf4j-impl-2.17.1.jar

You'll notice that I've got a <span class="inline-code">hadoop</span> folder with just a small number of JARs in. These are just the bare essentials that I found Flink needed. However, the proper (and more reliable) way that's documented is to set the <span class="inline-code">HADOOP_CLASSPATH</span> pointing to your Hadoop download:

export HADOOP_CLASSPATH=$(/hadoop-3.3.4/bin/hadoop classpath)

The <span class="inline-code">HADOOP_CLASSPATH</span> gets added to the classpath (via the INTERNAL_HADOOP_CLASSPATH variable) with which Flink is launched.

A final note on <span class="inline-code">HADOOP_CLASSPATH</span> is that it's an environment variable local to the session in which you set it. So if you do this:

# terminal session 1
./bin/start-cluster.sh
# terminal session 2
export HADOOP_CLASSPATH=$(/hadoop-3.3.4/bin/hadoop classpath)
./bin/sql-client

You'll find that the SQL Client seems to work just fine (at least, doesn't throw an error):

Flink SQL> INSERT INTO `c_hive`.`db_rmoff`.t_foo VALUES ('a', 42);

But querying doesn't work:

Flink SQL> select * from `c_hive`.`db_rmoff`.t_foo;
[ERROR] Could not execute SQL statement. Reason:
java.lang.ClassNotFoundException: org.apache.hadoop.conf.Configuration

This is because the Flink taskmanager, started as part of <span class="inline-code">./bin/start-cluster.sh</span> as shown above, didn't have <span class="inline-code">HADOOP_CLASSPATH</span> set when it was launched.

Checking where Flink is looking for JARs

You can inspect the classpath that each Flink process is running with through the <span class="inline-code">jinfo</span> command:

$ jinfo $(pgrep -f org.apache.flink.table.client.SqlClient) | grep java_class_path 
java_class_path (initial): /Users/rmoff/flink/flink-1.18.1/lib/aws/aws-java-sdk-bundle-1.12.648.jar:/Users/rmoff/flink/flink-1.18.1/lib/aws/hadoop-aws-3.3.4.jar:/Users/rmoff/flink/flink-1.18.1/lib/flink-cep-1.18.1.jar:/Users/rmoff/flink/flink-1.18.1/lib/flink-connector-files-1.18.1.jar:/Users/rmoff/flink/flink-1.18.1/lib/flink-csv-1.18.1.jar:/Users/rmoff/flink/flink-1.18.1/lib/flink-json-1.18.1.jar:/Users/rmoff/flink/flink-1.18.1/lib/flink-scala_2.12-1.18.1.jar:/Users/rmoff/flink/flink-1.18.1/lib/flink-table-api-java-uber-1.18.1.jar:/Users/rmoff/flink/flink-1.18.1/lib/flink-table-planner-loader-1.18.1.jar:/Users/rmoff/flink/flink-1.18.1/lib/flink-table-runtime-1.18.1.jar:/Users/rmoff/flink/flink-1.18.1/lib/hadoop/commons-configuration2-2.1.1.jar:/Users/rmoff/flink/flink-1.18.1/lib/hadoop/commons-logging-1.1.3.jar:/Users/rmoff/flink/flink-1.18.1/lib/hadoop/hadoop-auth-3.3.4.jar:/Users/rmoff/flink/flink-1.18.1/lib/hadoop/hadoop-common-3.3.4.jar:/Users/rmoff/flink/flink-1.18.1/lib/hadoop/hadoop-hdfs-client-3.3.4.jar:/Users/rmoff/flink/flink-1.18.1/lib/hadoop/hadoop-mapreduce-client-core-3.3.4.jar:/Users/rmoff/flink/flink-1.18.1/lib/hadoop/hadoop-shaded-guava-1.1.1.jar:/Users/rmoff/flink/flink-1.18.1/lib/hadoop/stax2-api-4.2.1.jar:/Users/rmoff/flink/flink-1.18.1/lib/hadoop/woodstox-core-5.3.0.jar:/Users/rmoff/flink/flink-1.18.1/lib/hive/flink-sql-connector-hive-3.1.3_2.12-1.18.1.jar:/Users/rmoff/flink/flink-1.18.1/lib/iceberg/iceberg-aws-bundle-1.4.3.jar:/Users/rmoff/flink/flink-1.18.1/lib/iceberg/iceberg-flink-runtime-1.17-1.4.3.jar:/Users/rmoff/flink/flink-1.18.1/lib/log4j-1.2-api-2.17.1.jar:/Users/rmoff/flink/flink-1.18.1/lib/log4j-api-2.17.1.jar:/Users/rmoff/flink/flink-1.18.1/lib/log4j-core-2.17.1.jar:/Users/rmoff/flink/flink-1.18.1/lib/log4j-slf4j-impl-2.17.1.jar:/Users/rmoff/flink/flink-1.18.1/lib/flink-dist-1.18.1.jar:/Users/rmoff/flink/flink-1.18.1/opt/flink-python-1.18.1.jar:/Users/rmoff/flink/flink-1.18.1/opt/flink-sql-gateway-1.18.1.jar::::/Users/rmoff/fli

With a bit more bash we can display it in a way that's much easier to read, splitting out just the classpath and putting each entry on a new line:

$ jinfo $(pgrep -f org.apache.flink.table.client.SqlClient) | grep java_class_path | tr ':' '\n'

/Users/rmoff/flink/flink-1.18.1/lib/aws/aws-java-sdk-bundle-1.12.648.jar
/Users/rmoff/flink/flink-1.18.1/lib/aws/hadoop-aws-3.3.4.jar
/Users/rmoff/flink/flink-1.18.1/lib/flink-cep-1.18.1.jar
/Users/rmoff/flink/flink-1.18.1/lib/flink-connector-files-1.18.1.jar
[…]

Don't Forget to Restart All the Flink processes

When you add or remove a JAR you need to make sure to restart all the Flink components so that they pick it up. Some interactions the SQL Client does directly, others get submitted to the job manager and you'll only hit an error then.

Particularly if you're frantically dropping JARs in and out to try and fix a dependency issue, make a point of deliberately restarting all the processes each time. Otherwise you find that something 'suddenly' stops working because it's only after numerous changes that you restarted it and now you can't remember what permutation things were in when it did work.

Have you got the right JAR? Looking Inside a JAR 🔎

So you've hit the dreaded <span class="inline-code">ClassNotFoundException</span> error. You know that it's usually down to not having the right JAR loaded by Flink SQL. But what if you think you've got the right JAR and want to confirm it?

Let's take this example:

Flink SQL> CREATE CATALOG c_hive WITH (
>        'type' = 'hive',
>        'hive-conf-dir' = './conf/'
>    );
[ERROR] Could not execute SQL statement. Reason:
java.lang.ClassNotFoundException: org.apache.hadoop.fs.FSDataInputStream

A JAR file is just an archive of code, and using the <span class="inline-code">-t</span> attribute of the <span class="inline-code">jar</span> command we can list its contents. Let's verify if the JAR we think provides this class does indeed do so:

$ jar -tf hadoop-common-3.3.2.jar | grep org.apache.hadoop.fs.FSDataInputStream
org/apache/hadoop/fs/FSDataInputStream.class

Yes it does! We can also search a folder for a given class, with this little bit of Bash:

find ~/hadoop-3.3.4 -name '*.jar' | while read jar; do
    if jar -tf "$jar" | grep -q 'org.apache.hadoop.fs.FSDataInputStream'; then
        echo "Found in: $jar"
    fi
done

So we've checked one possible issue off our list, and can move onto the point below: is the JAR even getting picked up by Flink?

Did the JAR even get loaded?

There's something in Java called classloading, and you can read about how Flink does it here. From an end-user point of view if you're trying to understand what's going on with a JAR file one useful trick is to see if it's even being loaded, or perhaps if it is by what.

If you set <span class="inline-code">JVM_ARGS</span> to <span class="inline-code">-verbose:class</span> when you run the SQL Client you can see what's going on. Couple it with putting your SQL statements in a file that you pass with -f</span> and you can dump and end-to-end process out to a log file to then poke through at your leisure:

JVM_ARGS=-verbose:class ./bin/sql-client.sh -f ../my_sql_statements.sql \
  > jvm_debug_stuff.log

Here is an example of the kind of thing you might see, showing where a particular catalog is being loaded from:

[0.768s][info][class,load] org.apache.iceberg.flink.FlinkCatalog source: 
file:/Users/rmoff/flink/flink-1.18.1/lib/iceberg-flink-runtime-1.17-1.4.3.jar

In this example we can see that in our 1.18 installation of Flink the <span class="inline-code">FlinkCatalog</span> is being loaded from a JAR intended for use with 1.17—which as we discussed above, is often going to lead to problem since versions should be kept in alignment.

Does it really need to be this complicated?

No, not if you use Decodable's fully managed platform. 🙂 We have Flink SQL and do all of the jarring-JAR fiddling for you. Give it a whirl today.

<sup>[1] I didn't play the lottery in 2023 😉</sup>

📫 Email signup 👇

Did you enjoy this issue of Checkpoint Chronicle? Would you like the next edition delivered directly to your email to read from the comfort of your own home?

Simply enter your email address here and we'll send you the next issue as soon as it's published—and nothing else, we promise!

Robin Moffatt

Robin is a Principal DevEx Engineer at Decodable. He has been speaking at conferences since 2009 including QCon, Devoxx, Strata, Kafka Summit, and Øredev. You can find many of his talks online and his articles on the Decodable blog as well as his own blog.

Outside of work, Robin enjoys running, drinking good beer, and eating fried breakfasts—although generally not at the same time.