Back
August 27, 2024
6
min read

Adventures with Apache Flink and Delta Lake

By
Robin Moffatt
Share this post

Delta Lake (or Delta, as it’s often shortened to) is an open-source project from the Linux Foundation that’s primarily backed by Databricks. It’s an open table format (OTF) similar in concept to Apache Iceberg and Apache Hudi. Having previously dug into using Iceberg with both Apache Flink and Decodable, I wanted to see what it was like to use Delta with Flink—and specifically, Flink SQL.

The Delta Lake project provides a connector for Flink. In the link from the GitHub repository the connector is marked as <span class="inline-code">(Preview)</span>. This compounds the general sense that Apache Spark is, by far, the sole first-class citizen in this ecosystem when it comes to compute engines for processing. Which given that Delta is backed by Databricks (whose whole platform stems from Spark) is not unsurprising. Other integrations for query engines such as Trino do exist with apparently better support.

Just a tip before we begin: regardless of table format, if you’re using Flink you need to understand catalogs and catalog metastores before going any further. The good news for you is that I’ve written both a primer and detailed hands-on guide. Once you’ve understood catalogs, you’ll also want to have an appreciation of JAR management in Flink.

This blog post follows on from the one that prompted it, Understanding Flink and S3 configuration, since as you’ll see later this is the very nub of the challenge that I had getting Delta to work with Flink SQL. To begin with, I’ll give you the conclusion, and then work backwards from there.

tl;dr: Getting Flink to write to Delta Lake on S3

1. In <span class="inline-code">conf/flink-conf.yaml</span> configure your S3 authentication with both sets of prefixes:

flink.hadoop.fs.s3a.access.key: admin
flink.hadoop.fs.s3a.secret.key: password
flink.hadoop.fs.s3a.endpoint: http://localhost:9000
flink.hadoop.fs.s3a.path.style.access: true

fs.s3a.access.key: admin
fs.s3a.secret.key: password
fs.s3a.endpoint: http://localhost:9000
fs.s3a.path.style.access: true

Note that I’m using MinIO, which is S3-compatible. If you’re using real S3 then you oughtn’t and need the <span class="inline-code">endpoint</span> and <span class="inline-code">path.style.access</span> configuration.

2. Install the Flink S3 filesystem plugin:

mkdir ./plugins/s3-fs-hadoop && \
cp ./opt/flink-s3-fs-hadoop-1.18.1.jar ./plugins/s3-fs-hadoop/

3. Add the following JARs to Flink’s <span class="inline-code">./lib</span> folder:

ls -l ./lib/delta
total 766016
-rw-r--r--  1 rmoff  staff  369799698  9 Jul 17:07 aws-java-sdk-bundle-1.12.648.jar
-rw-r--r--  1 rmoff  staff     213077  9 Jul 16:22 delta-flink-3.2.0.jar
-rw-r--r--  1 rmoff  staff   11151167  9 Jul 16:21 delta-standalone_2.12-3.2.0.jar
-rw-r--r--  1 rmoff  staff      24946  9 Jul 16:54 delta-storage-3.2.0.jar
-rw-r--r--  1 rmoff  staff    6740707  9 Jul 17:35 flink-sql-parquet-1.18.1.jar
-rw-r--r--  1 rmoff  staff     962685  9 Jul 16:59 hadoop-aws-3.3.4.jar
-rw-r--r--  1 rmoff  staff    3274833  9 Jul 17:31 shapeless_2.12-2.3.4.jar

4. Set the environment variable <span class="inline-code">HADOOP_CLASSPATH</span> to a full Hadoop installation:

export HADOOP_CLASSPATH=$(~/hadoop/hadoop-3.3.4/bin/hadoop classpath)

-OR-

Add the followings JARs to Flink’s <span class="inline-code">./lib</span> folder:

ls -l ./lib/hadoop
total 41840
-rw-r--r--  1 rmoff  staff   616888 10 Jul 16:13 commons-configuration2-2.1.1.jar
-rw-r--r--  1 rmoff  staff    62050 10 Jul 16:12 commons-logging-1.1.3.jar
-rw-r--r--  1 rmoff  staff  2747878 10 Jul 16:10 guava-27.0-jre.jar
-rw-r--r--  1 rmoff  staff   104441 10 Jul 16:14 hadoop-auth-3.3.4.jar
-rw-r--r--  1 rmoff  staff  4470571 10 Jul 16:00 hadoop-common-3.3.4.jar
-rw-r--r--  1 rmoff  staff  5501412 10 Jul 16:30 hadoop-hdfs-client-3.3.4.jar
-rw-r--r--  1 rmoff  staff  1636329 10 Jul 16:32 hadoop-mapreduce-client-core-3.3.4.jar
-rw-r--r--  1 rmoff  staff  3362359 10 Jul 16:10 hadoop-shaded-guava-1.1.1.jar
-rw-r--r--  1 rmoff  staff    75705 10 Jul 16:17 jackson-annotations-2.12.7.jar
-rw-r--r--  1 rmoff  staff   581860 10 Jul 16:05 jackson-core-2.17.1.jar
-rw-r--r--  1 rmoff  staff  1517276 10 Jul 16:17 jackson-databind-2.12.7.jar
-rw-r--r--  1 rmoff  staff   195909 10 Jul 16:02 stax2-api-4.2.1.jar
-rw-r--r--  1 rmoff  staff   522360 10 Jul 16:02 woodstox-core-5.3.0.jar

5. Now in the SQL client, create a Delta catalog and database within it:

CREATE CATALOG c_delta
    WITH ('type'         = 'delta-catalog',
          'catalog-type' = 'in-memory');

CREATE DATABASE c_delta.db_new;

then a table, and insert a row of data into it:

CREATE TABLE c_delta.db_new.t_foo (c1 VARCHAR,
                                   c2 INT)
    WITH ('connector'  = 'delta',
          'table-path' = 's3a://warehouse/');

INSERT INTO c_delta.db_new.t_foo
    VALUES ('a',42);

Done! Delta on Flink SQL. Read on below to find out the gorey details of how I got here, and how you can troubleshoot errors that you might encounter on the way.

Running it with Docker Compose

I’ve built out a self-contained stack so that you can experiment with running Flink SQL and Delta Lake together. It’s on the Decodable examples repository.

Clone this repository, and then bring up the stack and run the smoke-test SQL automagically to create and populate a Delta Lake table:

cd flink-delta-lake

docker compose down --remove-orphans  && \
docker volume prune -f                && \
docker network prune -f               && \
docker compose build                  && \
docker compose up -d                  && \
while ! nc -z localhost 8081; do sleep 1; done && \
docker compose exec -it jobmanager bash -c "./bin/sql-client.sh -f /data/delta-flink.sql"

This should result in a <span class="inline-code">t_foo</span> table written on S3 (MinIO), which you can verify with MinIO’s <span class="inline-code">mc</span> command:

$ docker exec mc bash -c "mc ls --recursive minio"
[2024-07-10 19:11:29 UTC]   776B STANDARD warehouse/t_foo/_delta_log/00000000000000000000.json
[2024-07-10 19:11:40 UTC]   652B STANDARD warehouse/t_foo/_delta_log/00000000000000000001.json
[2024-07-10 19:11:39 UTC]   428B STANDARD warehouse/t_foo/part-9ac74dad-4111-42d5-a287-915f1a0d3dc8-0.snappy.parquet

Reading the Delta Lake table with DuckDB

Unfortunately DuckDB Delta doesn’t seem to yet support MinIO, but if I re-run the test against a real S3 bucket we can query the Delta Lake data successfully:

🟡◗ CREATE SECRET secret_rmoff (
          TYPE S3,
          KEY_ID 'xxxxx',
          SECRET 'yyyyyy',
          REGION 'us-west-2',
          SCOPE 's3://rmoff');

🟡◗ SELECT * FROM DELTA_SCAN('s3://rmoff/delta/t_foo');
100% ▕████████████████████████████████████████████████████████████▏
┌─────────┬───────┐
│   c1    │  c2   │
│ varchar │ int32 │
├─────────┼───────┤
│ Never   │    42 │
│ Gonna   │    42 │
│ Give    │    42 │
│ You     │    42 │
│ Up      │    42 │
└─────────┴───────┘
Run Time (s): real 7.059 user 0.354942 sys 1.599596

Getting from here…to there: a Flink/Delta Lake troubleshooting detective story 🔍

Above is what works, since that’s usually what people find most useful. But what can also be useful is understanding how I got there, since it can throw up some useful pointers for others experiencing the same problems, as well as a general lesson in methodical troubleshooting.

I started from a vanilla Flink 1.18.1 environment running on my local machine (no Docker Compose, yet) with just the Flink S3 filesystem plugin installed:

mkdir ./plugins/s3-fs-hadoop && \
cp ./opt/flink-s3-fs-hadoop-1.18.1.jar ./plugins/s3-fs-hadoop/

Configuration for the S3 plugin was added to the end of the <span class="inline-code">./conf/flink-conf.yaml</span> file. Since I’m using MinIO (an S3-compatible object store) the additional <span class="inline-code">endpoint</span> and <span class="inline-code">path.style.access</span> configuration is needed.

fs.s3a.access.key: admin
fs.s3a.secret.key: password
fs.s3a.endpoint: http://localhost:9000
fs.s3a.path.style.access: true

S3 storage was provided by MinIO in a standalone Docker container:

docker run --rm --detach \
           --name minio \
           -p 9001:9001 -p 9000:9000 \
           -e "MINIO_ROOT_USER=admin" \
           -e "MINIO_ROOT_PASSWORD=password" \
           minio/minio \
           server /data --console-address ":9001"

I created a bucket called warehouse:

docker exec -it minio bash -c "mc config host add minio http://localhost:9000 admin password"
docker exec -it minio bash -c "mc mb minio/warehouse"

Then I wrote a set of very simple SQL to create and populate a Delta Lake table based on the provided documentation:

delta-flink.sql

CREATE CATALOG c_delta
    WITH ('type'         = 'delta-catalog',
          'catalog-type' = 'in-memory');

CREATE DATABASE c_delta.db_new;

CREATE TABLE c_delta.db_new.t_foo (c1 VARCHAR,
                                   c2 INT)
    WITH ('connector'  = 'delta',
          'table-path' = 's3a://warehouse/');

INSERT INTO c_delta.db_new.t_foo
    VALUES ('a',42);

Then I ran the following for each test, which bounced the Flink cluster and cleared the log files each time, and then submitted the above SQL as a set of statements to run:

rm log/*.* && ./bin/start-cluster.sh                                     && \
    ./bin/sql-client.sh -f delta-flink.sql                               && \
    ./bin/stop-cluster.sh                                                && \
    ps -ef|grep java|grep flink|awk '{print $2}'|xargs -Ifoo kill -9 foo && \
    jps

Step 1: Add Delta JARs

Based loosely on the Delta Lake connector doc I started by adding Delta JARs to my Flink <span class="inline-code">./lib</span> folder:

delta-flink-3.2.0.jar
delta-standalone_2.12-3.2.0.jar

I also set detailed logging for several components including Delta by adding this to Flink’s <span class="inline-code">log4j.properties</span> and <span class="inline-code">log4j-cli.properties</span>:

logger.fs.name = org.apache.hadoop.fs
logger.fs.level = TRACE
logger.fs2.name = org.apache.flink.fs
logger.fs2.level = TRACE
logger.aws.name = com.amazonaws
logger.aws.level = TRACE
logger.delta.name = io.delta
logger.delta.level = TRACE

Trying it out for the first time…🤞

The very first statement failed:

CREATE CATALOG c_delta WITH ( 'type' = 'delta-catalog', 'catalog-type' = 'in-memory');

[ERROR] Could not execute SQL statement. Reason:
java.lang.ClassNotFoundException: org.apache.hadoop.conf.Configuration

This highlighted the need to set my Hadoop classpath for dependencies:

export HADOOP_CLASSPATH=$(~/hadoop/hadoop-3.3.4/bin/hadoop classpath)

Bounce the stack, try again—and now creating the catalog worked, so create a database within it:

CREATE DATABASE c_delta.db_new;

Create a table:

Flink SQL> CREATE TABLE c_delta.db_new.t_foo (c1 varchar, c2 int) WITH ('connector' = 'delta',  'table-path' = 's3a://warehouse/');

[ERROR] Could not execute SQL statement. Reason:
java.lang.ClassNotFoundException: io.delta.storage.LogStore

This class is part of <span class="inline-code">delta-storage</span>, so let’s add <span class="inline-code">delta-storage-3.2.0.jar</span> to the <span class="inline-code">./lib</span> folder and try again.

(At this point I’m not going to repeat 'and then I bounced the stack'; take it as a given that after each thing I changed, I bounced the stack and reran the test, using the bash code I showed above.)

When is S3AFileSystem not S3AFileSystem?

Flink SQL> CREATE TABLE c_delta.db_new.t_foo (c1 varchar, c2 int)
WITH ('connector' = 'delta',  'table-path' = 's3a://warehouse/');

[ERROR] Could not execute SQL statement. Reason:
java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found

Time to dig into the logs. For the above failure we find an entry in the <span class="inline-code">sql-client</span> log file, I guess because the DDL (<span class="inline-code">CREATE TABLE</span>) runs directly from the SQL client, whilst DML (such as <span class="inline-code">INSERT</span>) will run on the job manager .

So for the above <span class="inline-code">CREATE TABLE</span> failure we get this log (with plenty of detail, because I increased logging to <span class="inline-code">TRACE</span> for the key components):

WARN  io.delta.flink.internal.table.HadoopUtils                    [] - Could not find Hadoop configuration via any of the supported methods (Flink configuration, environment variables).
DEBUG org.apache.hadoop.fs.FileSystem                              [] - Starting: Acquiring creator semaphore for s3a://warehouse/_delta_log
DEBUG org.apache.hadoop.fs.FileSystem                              [] - Acquiring creator semaphore for s3a://warehouse/_delta_log: duration 0:00.000s
DEBUG org.apache.hadoop.fs.FileSystem                              [] - Starting: Creating FS s3a://warehouse/_delta_log
DEBUG org.apache.hadoop.fs.FileSystem                              [] - Loading filesystems
DEBUG org.apache.hadoop.fs.FileSystem                              [] - file:// = class org.apache.hadoop.fs.LocalFileSystem from /Users/rmoff/hadoop/hadoop-3.3.4/share/hadoop/common/hadoop-common-3.3.4.jar
DEBUG org.apache.hadoop.fs.FileSystem                              [] - viewfs:// = class org.apache.hadoop.fs.viewfs.ViewFileSystem from /Users/rmoff/hadoop/hadoop-3.3.4/share/hadoop/common/hadoop-common-3.3.4.jar
DEBUG org.apache.hadoop.fs.FileSystem                              [] - har:// = class org.apache.hadoop.fs.HarFileSystem from /Users/rmoff/hadoop/hadoop-3.3.4/share/hadoop/common/hadoop-common-3.3.4.jar
DEBUG org.apache.hadoop.fs.FileSystem                              [] - http:// = class org.apache.hadoop.fs.http.HttpFileSystem from /Users/rmoff/hadoop/hadoop-3.3.4/share/hadoop/common/hadoop-common-3.3.4.jar
DEBUG org.apache.hadoop.fs.FileSystem                              [] - https:// = class org.apache.hadoop.fs.http.HttpsFileSystem from /Users/rmoff/hadoop/hadoop-3.3.4/share/hadoop/common/hadoop-common-3.3.4.jar
DEBUG org.apache.hadoop.fs.FileSystem                              [] - hdfs:// = class org.apache.hadoop.hdfs.DistributedFileSystem from /Users/rmoff/hadoop/hadoop-3.3.4/share/hadoop/hdfs/hadoop-hdfs-client-3.3.4.jar
DEBUG org.apache.hadoop.fs.FileSystem                              [] - webhdfs:// = class org.apache.hadoop.hdfs.web.WebHdfsFileSystem from /Users/rmoff/hadoop/hadoop-3.3.4/share/hadoop/hdfs/hadoop-hdfs-client-3.3.4.jar
DEBUG org.apache.hadoop.fs.FileSystem                              [] - swebhdfs:// = class org.apache.hadoop.hdfs.web.SWebHdfsFileSystem from /Users/rmoff/hadoop/hadoop-3.3.4/share/hadoop/hdfs/hadoop-hdfs-client-3.3.4.jar
DEBUG org.apache.hadoop.fs.FileSystem                              [] - Looking for FS supporting s3a
DEBUG org.apache.hadoop.fs.FileSystem                              [] - looking for configuration option fs.s3a.impl
DEBUG org.apache.hadoop.fs.FileSystem                              [] - Creating FS s3a://warehouse/_delta_log: duration 0:00.040s
ERROR org.apache.flink.table.gateway.service.operation.OperationManager [] - Failed to execute the operation 65c46b2b-a85e-4a12-90f1-c624e8eac65c.
org.apache.flink.table.api.TableException: Could not execute CreateTable in path `c_delta`.`db_new`.`t_foo`
    at org.apache.flink.table.catalog.CatalogManager.execute(CatalogManager.java:1296) ~[flink-table-api-java-uber-1.18.1.jar:1.18.1]
[…]
Caused by: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found
    at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2688) ~[hadoop-common-3.3.4.jar:?]
    at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3431) ~[hadoop-common-3.3.4.jar:?]
    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3466) ~[hadoop-common-3.3.4.jar:?]
    at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174) ~[hadoop-common-3.3.4.jar:?]
    at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574) ~[hadoop-common-3.3.4.jar:?]
    at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521) ~[hadoop-common-3.3.4.jar:?]
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540) ~[hadoop-common-3.3.4.jar:?]
    at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365) ~[hadoop-common-3.3.4.jar:?]
    at io.delta.standalone.internal.DeltaLogImpl$.apply(DeltaLogImpl.scala:260) ~[delta-standalone_2.12-3.2.0.jar:3.2.0]
    at io.delta.standalone.internal.DeltaLogImpl$.forTable(DeltaLogImpl.scala:241) ~[delta-standalone_2.12-3.2.0.jar:3.2.0]
    at io.delta.standalone.internal.DeltaLogImpl.forTable(DeltaLogImpl.scala) ~[delta-standalone_2.12-3.2.0.jar:3.2.0]
    at io.delta.standalone.DeltaLog.forTable(DeltaLog.java:164) ~[delta-standalone_2.12-3.2.0.jar:3.2.0]
    at io.delta.flink.internal.table.DeltaCatalog$1.load(DeltaCatalog.java:107) ~[delta-flink-3.2.0.jar:3.2.0]
    at io.delta.flink.internal.table.DeltaCatalog$1.load(DeltaCatalog.java:103) ~[delta-flink-3.2.0.jar:3.2.0]
    at com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3528) ~[guava-27.0-jre.jar:?]
    at com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2277) ~[guava-27.0-jre.jar:?]
    at com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2154) ~[guava-27.0-jre.jar:?]
    at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2044) ~[guava-27.0-jre.jar:?]
    at com.google.common.cache.LocalCache.get(LocalCache.java:3952) ~[guava-27.0-jre.jar:?]
    at com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974) ~[guava-27.0-jre.jar:?]
    at com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4958) ~[guava-27.0-jre.jar:?]
    at com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4964) ~[guava-27.0-jre.jar:?]
    at io.delta.flink.internal.table.DeltaCatalog.getDeltaLogFromCache(DeltaCatalog.java:390) ~[delta-flink-3.2.0.jar:3.2.0]
    at io.delta.flink.internal.table.DeltaCatalog.createTable(DeltaCatalog.java:178) ~[delta-flink-3.2.0.jar:3.2.0]
    at io.delta.flink.internal.table.CatalogProxy.createTable(CatalogProxy.java:69) ~[delta-flink-3.2.0.jar:3.2.0]
    at org.apache.flink.table.catalog.CatalogManager.lambda$createTable$18(CatalogManager.java:957) ~[flink-table-api-java-uber-1.18.1.jar:1.18.1]
    at org.apache.flink.table.catalog.CatalogManager.execute(CatalogManager.java:1290) ~[flink-table-api-java-uber-1.18.1.jar:1.18.1]
    ... 16 more

What’s really puzzling me here is that <span class="inline-code">S3AFileSystem</span> was found just fine on exactly the same deployment when I used it to write my previous article about S3 troubleshooting. All that’s changed is setting the Hadoop classpath, and adding the Delta JARs:

delta-flink-3.2.0.jar
delta-standalone_2.12-3.2.0.jar
delta-storage-3.2.0.jar

Let’s just try a write to S3 again—but not using Delta—just to make sure it still works:

Flink SQL> CREATE TABLE t_foo_fs (c1 varchar, c2 int)
           WITH (
            'connector' = 'filesystem',
            'path' = 's3a://warehouse/t_foo_fs/',
            'format' = 'json'
           );
[INFO] Execute statement succeed.

Flink SQL> INSERT INTO t_foo_fs VALUES ('a',42);
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: c19b4decafb1adc68e17c25741908a70

Flink SQL> SHOW JOBS;
+----------------------------------+-------------------------------------------------------+----------+-------------------------+
|                           job id |                                              job name |   status |              start time |
+----------------------------------+-------------------------------------------------------+----------+-------------------------+
| c19b4decafb1adc68e17c25741908a70 | insert-into_default_catalog.default_database.t_foo_fs | FINISHED | 2024-07-09T15:56:54.647 |
+----------------------------------+-------------------------------------------------------+----------+-------------------------+
1 row in set

So nothing wrong with our S3 configuration or dependencies there.

Back to Delta Lake, looking at the Delta Storage docs they say:

Delta Lake needs the <span class="inline-code">org.apache.hadoop.fs.s3a.S3AFileSystem</span> class from the <span class="inline-code">hadoop-aws</span> package

However, this class is in the existing <span class="inline-code">flink-s3-fs-hadoop-1.18.1.jar</span>:

$ jar tf flink-s3-fs-hadoop-1.18.1.jar|grep S3AFileSystem.class
org/apache/hadoop/fs/s3a/S3AFileSystem.class

But for whatever reason isn’t being used. Let’s add <span class="inline-code">hadoop-aws</span> because the docs say so, and see if it helps:

$ ls -l lib/delta
total 24152
-rw-r--r--  1 rmoff  staff    213077  9 Jul 16:22 delta-flink-3.2.0.jar
-rw-r--r--  1 rmoff  staff  11151167  9 Jul 16:21 delta-standalone_2.12-3.2.0.jar
-rw-r--r--  1 rmoff  staff     24946  9 Jul 16:54 delta-storage-3.2.0.jar
-rw-r--r--  1 rmoff  staff    962685  9 Jul 16:59 hadoop-aws-3.3.4.jar

It seems to help but we’ve now got another JAR missing:

Flink SQL> CREATE TABLE c_delta.db_new.t_foo (c1 varchar, c2 int) WITH ('connector' = 'delta',  'table-path' = 's3a://warehouse/');

[ERROR] Could not execute SQL statement. Reason:
java.lang.ClassNotFoundException: com.amazonaws.AmazonClientException

From past, bitter, experience I know that I need <span class="inline-code">aws-java-sdk-bundle</span> JAR here, so add <span class="inline-code">aws-java-sdk-bundle-1.12.648.jar</span> to the <span class="inline-code">./lib</span> folder too.

S3 Authentication from Flink with Delta Lake

Things now move on past JAR files and onto other problems:

Flink SQL> CREATE TABLE c_delta.db_new.t_foo (c1 varchar, c2 int)
WITH ('connector' = 'delta',  'table-path' = 's3a://warehouse/');

[ERROR] Could not execute SQL statement. Reason:
com.amazonaws.SdkClientException: Unable to load AWS credentials from
environment variables (AWS_ACCESS_KEY_ID (or AWS_ACCESS_KEY) and
AWS_SECRET_KEY (or AWS_SECRET_ACCESS_KEY))

Remember, in this same environment I have S3 working just fine. Credentials, end points, it’s all good. But switch to writing to S3 via the Delta connector and everything goes screwy.

Heading to the <span class="inline-code">sql-client</span> log file we can see that our existing <span class="inline-code">flink-conf.yaml</span> configuration for S3 just isn’t being used, as evidenced by the lack of a custom S3 endpoint being recognised (see my previous post for more detail):

DEBUG org.apache.hadoop.fs.s3a.DefaultS3ClientFactory              [] - Creating endpoint configuration for ""
DEBUG org.apache.hadoop.fs.s3a.DefaultS3ClientFactory              [] - Using default endpoint -no need to generate a configuration

This is despite the configuration being logged as present earlier in the log:

INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: fs.s3a.path.style.access, true
INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: fs.s3a.access.key, admin
INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: fs.s3a.secret.key, ******
INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: fs.s3a.endpoint, http://localhost:9000

And thus, the <span class="inline-code">SimpleAWSCredentialsProvider</span> reports <span class="inline-code">No AWS credentials in the Hadoop configuration…</span>

DEBUG org.apache.hadoop.fs.s3a.AWSCredentialProviderList           [] - No credentials from SimpleAWSCredentialsProvider: org.apache.hadoop.fs.s3a.auth.NoAwsCredentialsException: SimpleAWSCredentialsProvider: No AWS credentials in the Hadoop configuration

…leaving authentication to failback to the <span class="inline-code">EnvironmentVariableCredentialsProvider</span> (which also doesn’t work because I’ve not set any credentials in the environment variables):

DEBUG org.apache.hadoop.fs.s3a.AWSCredentialProviderList           [] - No credentials provided by EnvironmentVariableCredentialsProvider: com.amazonaws.SdkClientException: Unable to load AWS credentials from environment variables (AWS_ACCESS_KEY_ID (or AWS_ACCESS_KEY) and AWS_SECRET_KEY (or AWS_SECRET_ACCESS_KEY))

There is a <span class="inline-code">WARN</span> log entry which might be relevant:

WARN  io.delta.flink.internal.table.HadoopUtils                    [] - Could not find Hadoop configuration via any of the supported methods (Flink configuration, environment variables).

This also then ties with the <span class="inline-code">No AWS credentials in the Hadoop configuration</span> message from <span class="inline-code">SimpleAWSCredentialsProvider</span>.

Looking at <span class="inline-code">io.delta.flink.internal.table.HadoopUtils</span> shows an interesting message:

This class was backported from Flink’s flink-hadoop-fs module, and it contains a subset of methods comparing to the original class. We kept only needed methods.

What’s more, it includes this:

FLINK_CONFIG_PREFIXES = {"flink.hadoop."};

I talked about <span class="inline-code">FLINK_CONFIG_PREFIXES</span> in my previous Flink S3 troubleshooting post. Taking a bit of a punt, let’s change our <span class="inline-code">flink-conf.yaml</span> and add this <span class="inline-code">flink.hadoop.</span> prefix:

flink.hadoop.fs.s3a.access.key: admin
flink.hadoop.fs.s3a.secret.key: password
flink.hadoop.fs.s3a.endpoint: http://localhost:9000
flink.hadoop.fs.s3a.path.style.access: true

After this, a different error!

But before we get to that, let’s just record for posterity what we’re now seeing in the log file:

INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: flink.hadoop.fs.s3a.access.key, admin
INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: flink.hadoop.fs.s3a.secret.key, ******
INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: flink.hadoop.fs.s3a.endpoint, http://localhost:9000
INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: flink.hadoop.fs.s3a.path.style.access, true
[…]
DEBUG io.delta.flink.internal.table.HadoopUtils                    [] - Adding Flink config entry for flink.hadoop.fs.s3a.secret.key as fs.s3a.secret.key=password to Hadoop config
DEBUG io.delta.flink.internal.table.HadoopUtils                    [] - Adding Flink config entry for flink.hadoop.fs.s3a.access.key as fs.s3a.access.key=admin to Hadoop config
DEBUG io.delta.flink.internal.table.HadoopUtils                    [] - Adding Flink config entry for flink.hadoop.fs.s3a.path.style.access as fs.s3a.path.style.access=true to Hadoop config
DEBUG io.delta.flink.internal.table.HadoopUtils                    [] - Adding Flink config entry for flink.hadoop.fs.s3a.endpoint as fs.s3a.endpoint=http://localhost:9000 to Hadoop config
[…]
DEBUG org.apache.hadoop.fs.s3a.AWSCredentialProviderList           [] - Using credentials from SimpleAWSCredentialsProvider

So similar to the previous (but different 😡) S3 configuration handling, what’s needed in the Flink configuration file isn’t what the <span class="inline-code">S3AFileSystem</span> module is using (or logging), which is all <span class="inline-code">fs.s3a.</span> prefixed.

More JARs Please

Anyway, back to our next error:

Flink SQL> CREATE TABLE c_delta.db_new.t_foo (c1 varchar, c2 int) WITH ('connector' = 'delta',  'table-path' = 's3a://warehouse/');

[ERROR] Could not execute SQL statement. Reason:
java.lang.ClassNotFoundException: shapeless.Generic

Google wasn’t much help on this one, but I eventually found this line in the Delta build script which references <span class="inline-code">com.chuusai</span> which pointed me to shapeless_2.12-2.3.4.jar on Maven. Adding this to <span class="inline-code">./lib</span> did the trick!

Success! (almost)

Flink SQL> CREATE TABLE c_delta.db_new.t_foo (c1 varchar, c2 int)
WITH ('connector' = 'delta',  'table-path' = 's3a://warehouse/');

[INFO] Execute statement succeed.

On MinIO there’s a Delta log file:

$ mc ls --recursive minio
[2024-07-09 16:32:16 UTC]   776B STANDARD warehouse/_delta_log/00000000000000000000.json

$ mc cat minio/warehouse/_delta_log/00000000000000000000.json

The contents of this looks like this:

{
    "commitInfo": {
        "timestamp": 1720542736002,
        "operation": "CREATE TABLE",
        "operationParameters": {
            "partitionBy": "[]",
            "description": null,
            "properties": "{}",
            "isManaged": false
        },
        "isolationLevel": "SnapshotIsolation",
        "isBlindAppend": true,
        "operationMetrics": {},
        "engineInfo": "flink-engine/1.16.1-flink-delta-connector/3.2.0Delta-Standalone/3.2.0"
    }
}
{
    "protocol": {
        "minReaderVersion": 1,
        "minWriterVersion": 2
    }
}
{
    "metaData": {
        "id": "e2f4dcb8-d4c7-4456-b758-c140a58bc190",
        "name": "t_foo",
        "format": {
            "provider": "parquet",
            "options": {}
        },
        "schemaString": "{\"type\":\"struct\",\"fields\":[{\"name\":\"c1\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"c2\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}",
        "partitionColumns": [],
        "configuration": {},
        "createdTime": 1720542735715
    }
}

Even More JARs Please

Now can we write data to our new Delta Lake table?

Flink SQL> INSERT INTO c_delta.db_new.t_foo VALUES ('a',42);

[ERROR] Could not execute SQL statement. Reason:
java.lang.ClassNotFoundException: org.apache.flink.formats.parquet.row.ParquetRowDataBuilder

Nope. At least, not yet. Let’s chuck the <span class="inline-code">flink-sql-parquet</span> JAR into <span class="inline-code">./lib</span> and see if that helps.

It does help! It’s been submitted!

[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 1dbe63859fa78caf50bc93ed75861907

…and it’s failed:

Flink SQL> SHOW JOBS;
+----------------------------------+----------------------------------+--------+-------------------------+
|                           job id |                         job name | status |              start time |
+----------------------------------+----------------------------------+--------+-------------------------+
| 1dbe63859fa78caf50bc93ed75861907 | insert-into_c_delta.db_new.t_foo | FAILED | 2024-07-09T16:36:07.096 |
+----------------------------------+----------------------------------+--------+-------------------------+
1 row in set

S3 Authentication (again!)

Looking at the <span class="inline-code">taskexecutor</span> log file from the Flink job manager we can see a very familiar error:

org.apache.flink.runtime.taskmanager.Task                    [] - t_foo[2]: Writer -> t_foo[2]: Committer (1/1)#0
(bd4e4c0cb7f1772d6bdfdf702755dc6f_20ba6b65f97481d5570070de90e4e791_0_0) switched from RUNNING to FAILED with failure cause:
java.nio.file.AccessDeniedException: part-d2ef12d1-300c-4d3b-b5ce-72fdcc359bb7-0.snappy.parquet:
org.apache.hadoop.fs.s3a.auth.NoAuthWithAWSException:
No AWS Credentials provided by DynamicTemporaryAWSCredentialsProvider TemporaryAWSCredentialsProvider SimpleAWSCredentialsProvider EnvironmentVariableCredentialsProvider IAMInstanceCredentialsProvider :
com.amazonaws.SdkClientException: Unable to load AWS credentials from environment variables (AWS_ACCESS_KEY_ID (or AWS_ACCESS_KEY) and AWS_SECRET_KEY (or AWS_SECRET_ACCESS_KEY))

Now this is weird, because we have successfully written to S3 (MinIO) when we created the table, in the very same session. It’s there, in black and white (or whatever l33t colour schema you have your terminal set to):

$ mc ls --recursive minio
[2024-07-09 16:32:16 UTC]   776B STANDARD warehouse/_delta_log/00000000000000000000.json

Looking through the taskexecutor log file in more detail I can also see successful interaction between the S3 client and MinIO requesting:

  1. <span class="inline-code">s3a://warehouse/_delta_log/_last_checkpoint</span> (which doesn’t exist, since we’ve only just created the table)
  2. <span class="inline-code">s3a://warehouse/_delta_log</span> (which doesn’t exist; it’s a 'folder', not an object)
  3. <span class="inline-code">s3a://warehouse</span> (which exists with an object below it <span class="inline-code">_delta_log/00000000000000000000.json</span>)
  4. etc.
DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem                       [] - Getting path status for s3a://warehouse/_delta_log/_last_checkpoint  (_delta_log/_last_checkpoint); needEmptyDirectory=false
DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem                       [] - S3GetFileStatus s3a://warehouse/_delta_log/_last_checkpoint
DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem                       [] - HEAD _delta_log/_last_checkpoint with change tracker null
DEBUG com.amazonaws.request                                        [] - Sending Request: HEAD http://localhost:9000 /warehouse/_delta_log/_last_checkpoint
DEBUG com.amazonaws.request                                        [] - Received error response: com.amazonaws.services.s3.model.AmazonS3Exception: Not Found (Service: Amazon S3; Status Code: 404; Error Code: 404 Not Found; Request ID: 17E0D513A0355CEF; S3 Extended Request ID: dd9025
DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem                       [] - Not Found: s3a://warehouse/_delta_log/_last_checkpoint
DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem                       [] - Getting path status for s3a://warehouse/_delta_log  (_delta_log); needEmptyDirectory=false
DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem                       [] - S3GetFileStatus s3a://warehouse/_delta_log
DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem                       [] - HEAD _delta_log with change tracker null
DEBUG com.amazonaws.request                                        [] - Sending Request: HEAD http://localhost:9000 /warehouse/_delta_log
DEBUG com.amazonaws.request                                        [] - Received error response: com.amazonaws.services.s3.model.AmazonS3Exception: Not Found (Service: Amazon S3; Status Code: 404; Error Code: 404 Not Found; Request ID: 17E0D513A1419BC2; S3 Extended Request ID: dd9025
DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem                       [] - LIST List warehouse:/_delta_log/ delimiter=/ keys=2 requester pays=false
DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem                       [] - Starting: LIST
DEBUG com.amazonaws.request                                        [] - Sending Request: GET http://localhost:9000 /warehouse/
DEBUG com.amazonaws.request                                        [] - Received successful response: 200, AWS Request ID: 17E0D513A1C2EDE8
DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem                       [] - LIST: duration 0:00.081s
DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem                       [] - Found path as directory (with /)
DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem                       [] - Prefix count = 0; object count=1
DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem                       [] - Summary: _delta_log/00000000000000000000.json 776
DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem                       [] - List status for path: s3a://warehouse/_delta_log
DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem                       [] - listStatus: doing listObjects for directory _delta_log/
DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem                       [] - LIST List warehouse:/_delta_log/ delimiter=/ keys=5000 requester pays=false
DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem                       [] - Starting: LIST
DEBUG com.amazonaws.request                                        [] - Sending Request: GET http://localhost:9000 /warehouse/
DEBUG com.amazonaws.request                                        [] - Received successful response: 200, AWS Request ID: 17E0D513A706C033
DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem                       [] - LIST: duration 0:00.008s
DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem                       [] - s3a://warehouse/_delta_log/00000000000000000000.json: _delta_log/00000000000000000000.json size=776
DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem                       [] - Adding: S3AFileStatus{path=s3a://warehouse/_delta_log/00000000000000000000.json; isDirectory=false; length=776; replication=1; blocksize=33554432; modification_time=1720542736275; access_time=0; owner=rmoff; grou
DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem                       [] - Added 1 entries; ignored 0; hasNext=true; hasMoreObjects=false

So S3 client and MinIO are clearly co-operating. But writing the actual data file (<span class="inline-code">part-d2ef12d1-300c-4d3b-b5ce-72fdcc359bb7-0.snappy.parquet</span>) fails. Searching in the log for the file name shows that the upload appears before the S3 interactions in the log that I’ve noted above:

INFO  org.apache.flink.runtime.taskmanager.Task                    [] - t_foo[2]: Writer -> t_foo[2]: Committer (1/1)#0 (bd4e4c0cb7f1772d6bdfdf702755dc6f_20ba6b65f97481d5570070de90e4e791_0_0) switched from INITIALIZING to RUNNING.
DEBUG io.delta.flink.sink.internal.writer.DeltaWriterBucket        [] - Opening new part file for bucket id= due to element org.apache.flink.table.data.binary.BinaryRowData@acbf1a5e.
DEBUG io.delta.flink.sink.internal.writer.DeltaWriterBucket        [] - Opening new part file "part-d2ef12d1-300c-4d3b-b5ce-72fdcc359bb7-0.snappy.parquet" for bucket id=.
DEBUG org.apache.hadoop.fs.s3a.WriteOperationHelper                [] - Initiating Multipart upload to part-d2ef12d1-300c-4d3b-b5ce-72fdcc359bb7-0.snappy.parquet
DEBUG org.apache.hadoop.fs.s3a.Invoker                             [] - Starting: initiate MultiPartUpload
DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem                       [] - Initiate multipart upload to part-d2ef12d1-300c-4d3b-b5ce-72fdcc359bb7-0.snappy.parquet

Just underneath this bit is a log message which is concerning, knowing what I know now about successful S3 log messages: <span class="inline-code">SimpleAWSCredentialsProvider: No AWS credentials in the Hadoop configuration</span>

DEBUG org.apache.hadoop.fs.s3a.AWSCredentialProviderList           [] - No credentials from SimpleAWSCredentialsProvider: org.apache.hadoop.fs.s3a.auth.NoAwsCredentialsException: SimpleAWSCredentialsProvider: No AWS credentials in the Hadoop configuration

BUT just a few milliseconds later:

DEBUG org.apache.hadoop.fs.s3a.AWSCredentialProviderList           [] - Using credentials from SimpleAWSCredentialsProvider

Something is playing silly buggers here 🤔. Looking in the log further I can see similar conflicting messages, here about the endpoint configuration:

DEBUG org.apache.hadoop.fs.s3a.DefaultS3ClientFactory              [] - Creating endpoint configuration for ""
DEBUG org.apache.hadoop.fs.s3a.DefaultS3ClientFactory              [] - Using default endpoint -no need to generate a configuration
DEBUG org.apache.hadoop.fs.s3a.DefaultS3ClientFactory              [] - fs.s3a.endpoint.region="us-east-1"
DEBUG org.apache.hadoop.fs.s3a.DefaultS3ClientFactory              [] - Using default endpoint; setting region to us-east-1

DEBUG org.apache.hadoop.fs.s3a.DefaultS3ClientFactory              [] - Creating endpoint configuration for "http://localhost:9000"
DEBUG org.apache.hadoop.fs.s3a.DefaultS3ClientFactory              [] - Endpoint URI = http://localhost:9000
DEBUG org.apache.hadoop.fs.s3a.DefaultS3ClientFactory              [] - Endpoint http://localhost:9000 is not the default; parsing
DEBUG org.apache.hadoop.fs.s3a.DefaultS3ClientFactory              [] - Region for endpoint http://localhost:9000, URI http://localhost:9000 is determined as null

It seems we have a Jekyll and Hyde scenario 😇 👿, with one playing nicely and using the config, and one doing its damndest to ignore it 🤪.

Let’s try something random. Let’s add the original format of S3 config back into <span class="inline-code">flink-conf.yaml</span>, so it’s duplicated:

flink.hadoop.fs.s3a.access.key: admin
flink.hadoop.fs.s3a.secret.key: password
flink.hadoop.fs.s3a.endpoint: http://localhost:9000
flink.hadoop.fs.s3a.path.style.access: true

fs.s3a.access.key: admin
fs.s3a.secret.key: password
fs.s3a.endpoint: http://localhost:9000
fs.s3a.path.style.access: true

Reader, it worked.

Flink SQL> CREATE CATALOG c_delta WITH ( 'type' = 'delta-catalog', 'catalog-type' = 'in-memory');
[INFO] Execute statement succeed.

Flink SQL> CREATE DATABASE c_delta.db_new;
[INFO] Execute statement succeed.

Flink SQL> INSERT INTO c_delta.db_new.t_foo VALUES ('a',42);
[INFO] Execute statement succeed.

Flink SQL> SHOW JOBS;
+----------------------------------+----------------------------------+----------+-------------------------+
|                           job id |                         job name |   status |              start time |
+----------------------------------+----------------------------------+----------+-------------------------+
| 8333654c32374acff55c5bfa800738f7 | insert-into_c_delta.db_new.t_foo | FINISHED | 2024-07-10T11:58:49.687 |
+----------------------------------+----------------------------------+----------+-------------------------+
1 row in set

Flink SQL> SELECT * FROM c_delta.db_new.t_foo;
+----+--------------------------------+-------------+
| op |                             c1 |          c2 |
+----+--------------------------------+-------------+
| +I |                              a |          42 |
+----+--------------------------------+-------------+
Received a total of 1 rows

Troubleshooting: Will the real S3AFileSystem please stand up?

Since we’re into the realms of trying random shit, let’s think about the <span class="inline-code">S3AFileSystem</span> dependency. If we’re just writing vanilla JSON (for example) from Flink, we add the <span class="inline-code">flink-s3-fs-hadoop</span> JAR. But as we saw above, that wasn’t enough for Delta Lake, and we had to add <span class="inline-code">hadoop-aws</span>. In our working environment we therefore have both present. Perhaps one is doing some of the metadata work, whilst the other is handling the data writes themselves.

On that basis, let’s get rid of <span class="inline-code">flink-s3-fs-hadoop</span> temporarily, and test this hypothesis. Since the <span class="inline-code">CREATE TABLE</span> above failed without <span class="inline-code">hadoop-aws</span>, the hypothesis is that <span class="inline-code">flink-s3-fs-hadoop</span> is used for writing the data. If we remove it, the <span class="inline-code">CREATE TABLE</span> should work, and the <span class="inline-code">INSERT INTO</span> fail.

$ rm -r plugins/s3-fs-hadoop

Now we restart the Flink cluster, and try our test SQL statements again:

Flink SQL> CREATE CATALOG c_delta WITH ( 'type' = 'delta-catalog', 'catalog-type' = 'in-memory')
[INFO] Execute statement succeed.

Flink SQL> CREATE TABLE c_delta.db_new.t_foo (c1 varchar, c2 int)  WITH (  'connector' = 'delta',  'table-path' = 's3a://warehouse/')
[INFO] Execute statement succeed.

Flink SQL> INSERT INTO c_delta.db_new.t_foo VALUES ('a',42)
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 's3a'. The scheme is directly supported by Flink through the following plugin(s): flink-s3-fs-hadoop. Please ensure that each plugin resides within its own subfolder within the plugins directory. See https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/filesystems/plugins/ for more information. If you want to use a Hadoop file system for that scheme, please add the scheme to the configuration fs.allowed-fallback-filesystems. For a full list of supported file systems, please see https://nightlies.apache.org/flink/flink-docs-stable/ops/filesystems/.

There we go—it’s a double act show, with <span class="inline-code">hadoop-aws</span> and <span class="inline-code">flink-s3-fs-hadoop</span> being responsible for writing to S3, but using different configuration parameters.

Finding the smoking gun

Having a bit of an itch to scratch here I wanted to find out more about the conflicting log messages. After a perusal of the log4j PatternLayout codes I amended the log4j.properties to use:

appender.main.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x tid:%tid [%-60threadName] - %m %n

Which then turned up the following—note the different <span class="inline-code">tid</span> values:

DEBUG org.apache.hadoop.fs.s3a.DefaultS3ClientFactory              [] tid:79 [t_foo[2]: Writer -> t_foo[2]: Committer (1/1)#0             ] - Creating endpoint configuration for ""
DEBUG org.apache.hadoop.fs.s3a.DefaultS3ClientFactory              [] tid:79 [t_foo[2]: Writer -> t_foo[2]: Committer (1/1)#0             ] - Using default endpoint -no need to generate a configuration
DEBUG org.apache.hadoop.fs.s3a.DefaultS3ClientFactory              [] tid:79 [t_foo[2]: Writer -> t_foo[2]: Committer (1/1)#0             ] - fs.s3a.endpoint.region="us-east-1"
DEBUG org.apache.hadoop.fs.s3a.DefaultS3ClientFactory              [] tid:79 [t_foo[2]: Writer -> t_foo[2]: Committer (1/1)#0             ] - Using default endpoint; setting region to us-east-1

DEBUG org.apache.hadoop.fs.s3a.DefaultS3ClientFactory              [] tid:80 [t_foo[2]: Global Committer (1/1)#0                          ] - Creating endpoint configuration for "http://localhost:9000"
DEBUG org.apache.hadoop.fs.s3a.DefaultS3ClientFactory              [] tid:80 [t_foo[2]: Global Committer (1/1)#0                          ] - Endpoint URI = http://localhost:9000
DEBUG org.apache.hadoop.fs.s3a.DefaultS3ClientFactory              [] tid:80 [t_foo[2]: Global Committer (1/1)#0                          ] - Endpoint http://localhost:9000 is not the default; parsing
DEBUG org.apache.hadoop.fs.s3a.DefaultS3ClientFactory              [] tid:80 [t_foo[2]: Global Committer (1/1)#0                          ] - Region for endpoint http://localhost:9000, URI http://localhost:9000 is determined as null

DEBUG org.apache.hadoop.fs.s3a.AWSCredentialProviderList           [] tid:79 [t_foo[2]: Writer -> t_foo[2]: Committer (1/1)#0             ] - No credentials from SimpleAWSCredentialsProvider: org.apache.hadoop.fs.s3a.auth.NoAwsCredentialsException: SimpleAWSCredentialsProvider: No AWS credentials in the Hadoop configuration

DEBUG org.apache.hadoop.fs.s3a.AWSCredentialProviderList           [] tid:80 [t_foo[2]: Global Committer (1/1)#0                          ] - Using credentials from SimpleAWSCredentialsProvider

So there are two threads: a <span class="inline-code">Global Committer</span> and a <span class="inline-code">Writer</span> (for table <span class="inline-code">t_foo</span>). As we saw above, writes to the table (<span class="inline-code">Writer</span> thread) seem to be done using <span class="inline-code">S3AFileSystem</span> from <span class="inline-code">flink-s3-fs-hadoop</span>, whilst those for the metadata (<span class="inline-code">Global Committer</span>) using <span class="inline-code">S3AFileSystem</span> from <span class="inline-code">hadoop-aws</span>.

📫 Email signup 👇

Did you enjoy this issue of Checkpoint Chronicle? Would you like the next edition delivered directly to your email to read from the comfort of your own home?

Simply enter your email address here and we'll send you the next issue as soon as it's published—and nothing else, we promise!

👍 Got it!
Oops! Something went wrong while submitting the form.
Robin Moffatt

Robin is a Principal DevEx Engineer at Decodable. He has been speaking at conferences since 2009 including QCon, Devoxx, Strata, Kafka Summit, and Øredev. You can find many of his talks online and his articles on the Decodable blog as well as his own blog.

Outside of work, Robin enjoys running, drinking good beer, and eating fried breakfasts—although generally not at the same time.

Delta Lake (or Delta, as it’s often shortened to) is an open-source project from the Linux Foundation that’s primarily backed by Databricks. It’s an open table format (OTF) similar in concept to Apache Iceberg and Apache Hudi. Having previously dug into using Iceberg with both Apache Flink and Decodable, I wanted to see what it was like to use Delta with Flink—and specifically, Flink SQL.

The Delta Lake project provides a connector for Flink. In the link from the GitHub repository the connector is marked as <span class="inline-code">(Preview)</span>. This compounds the general sense that Apache Spark is, by far, the sole first-class citizen in this ecosystem when it comes to compute engines for processing. Which given that Delta is backed by Databricks (whose whole platform stems from Spark) is not unsurprising. Other integrations for query engines such as Trino do exist with apparently better support.

Just a tip before we begin: regardless of table format, if you’re using Flink you need to understand catalogs and catalog metastores before going any further. The good news for you is that I’ve written both a primer and detailed hands-on guide. Once you’ve understood catalogs, you’ll also want to have an appreciation of JAR management in Flink.

This blog post follows on from the one that prompted it, Understanding Flink and S3 configuration, since as you’ll see later this is the very nub of the challenge that I had getting Delta to work with Flink SQL. To begin with, I’ll give you the conclusion, and then work backwards from there.

tl;dr: Getting Flink to write to Delta Lake on S3

1. In <span class="inline-code">conf/flink-conf.yaml</span> configure your S3 authentication with both sets of prefixes:

flink.hadoop.fs.s3a.access.key: admin
flink.hadoop.fs.s3a.secret.key: password
flink.hadoop.fs.s3a.endpoint: http://localhost:9000
flink.hadoop.fs.s3a.path.style.access: true

fs.s3a.access.key: admin
fs.s3a.secret.key: password
fs.s3a.endpoint: http://localhost:9000
fs.s3a.path.style.access: true

Note that I’m using MinIO, which is S3-compatible. If you’re using real S3 then you oughtn’t and need the <span class="inline-code">endpoint</span> and <span class="inline-code">path.style.access</span> configuration.

2. Install the Flink S3 filesystem plugin:

mkdir ./plugins/s3-fs-hadoop && \
cp ./opt/flink-s3-fs-hadoop-1.18.1.jar ./plugins/s3-fs-hadoop/

3. Add the following JARs to Flink’s <span class="inline-code">./lib</span> folder:

ls -l ./lib/delta
total 766016
-rw-r--r--  1 rmoff  staff  369799698  9 Jul 17:07 aws-java-sdk-bundle-1.12.648.jar
-rw-r--r--  1 rmoff  staff     213077  9 Jul 16:22 delta-flink-3.2.0.jar
-rw-r--r--  1 rmoff  staff   11151167  9 Jul 16:21 delta-standalone_2.12-3.2.0.jar
-rw-r--r--  1 rmoff  staff      24946  9 Jul 16:54 delta-storage-3.2.0.jar
-rw-r--r--  1 rmoff  staff    6740707  9 Jul 17:35 flink-sql-parquet-1.18.1.jar
-rw-r--r--  1 rmoff  staff     962685  9 Jul 16:59 hadoop-aws-3.3.4.jar
-rw-r--r--  1 rmoff  staff    3274833  9 Jul 17:31 shapeless_2.12-2.3.4.jar

4. Set the environment variable <span class="inline-code">HADOOP_CLASSPATH</span> to a full Hadoop installation:

export HADOOP_CLASSPATH=$(~/hadoop/hadoop-3.3.4/bin/hadoop classpath)

-OR-

Add the followings JARs to Flink’s <span class="inline-code">./lib</span> folder:

ls -l ./lib/hadoop
total 41840
-rw-r--r--  1 rmoff  staff   616888 10 Jul 16:13 commons-configuration2-2.1.1.jar
-rw-r--r--  1 rmoff  staff    62050 10 Jul 16:12 commons-logging-1.1.3.jar
-rw-r--r--  1 rmoff  staff  2747878 10 Jul 16:10 guava-27.0-jre.jar
-rw-r--r--  1 rmoff  staff   104441 10 Jul 16:14 hadoop-auth-3.3.4.jar
-rw-r--r--  1 rmoff  staff  4470571 10 Jul 16:00 hadoop-common-3.3.4.jar
-rw-r--r--  1 rmoff  staff  5501412 10 Jul 16:30 hadoop-hdfs-client-3.3.4.jar
-rw-r--r--  1 rmoff  staff  1636329 10 Jul 16:32 hadoop-mapreduce-client-core-3.3.4.jar
-rw-r--r--  1 rmoff  staff  3362359 10 Jul 16:10 hadoop-shaded-guava-1.1.1.jar
-rw-r--r--  1 rmoff  staff    75705 10 Jul 16:17 jackson-annotations-2.12.7.jar
-rw-r--r--  1 rmoff  staff   581860 10 Jul 16:05 jackson-core-2.17.1.jar
-rw-r--r--  1 rmoff  staff  1517276 10 Jul 16:17 jackson-databind-2.12.7.jar
-rw-r--r--  1 rmoff  staff   195909 10 Jul 16:02 stax2-api-4.2.1.jar
-rw-r--r--  1 rmoff  staff   522360 10 Jul 16:02 woodstox-core-5.3.0.jar

5. Now in the SQL client, create a Delta catalog and database within it:

CREATE CATALOG c_delta
    WITH ('type'         = 'delta-catalog',
          'catalog-type' = 'in-memory');

CREATE DATABASE c_delta.db_new;

then a table, and insert a row of data into it:

CREATE TABLE c_delta.db_new.t_foo (c1 VARCHAR,
                                   c2 INT)
    WITH ('connector'  = 'delta',
          'table-path' = 's3a://warehouse/');

INSERT INTO c_delta.db_new.t_foo
    VALUES ('a',42);

Done! Delta on Flink SQL. Read on below to find out the gorey details of how I got here, and how you can troubleshoot errors that you might encounter on the way.

Running it with Docker Compose

I’ve built out a self-contained stack so that you can experiment with running Flink SQL and Delta Lake together. It’s on the Decodable examples repository.

Clone this repository, and then bring up the stack and run the smoke-test SQL automagically to create and populate a Delta Lake table:

cd flink-delta-lake

docker compose down --remove-orphans  && \
docker volume prune -f                && \
docker network prune -f               && \
docker compose build                  && \
docker compose up -d                  && \
while ! nc -z localhost 8081; do sleep 1; done && \
docker compose exec -it jobmanager bash -c "./bin/sql-client.sh -f /data/delta-flink.sql"

This should result in a <span class="inline-code">t_foo</span> table written on S3 (MinIO), which you can verify with MinIO’s <span class="inline-code">mc</span> command:

$ docker exec mc bash -c "mc ls --recursive minio"
[2024-07-10 19:11:29 UTC]   776B STANDARD warehouse/t_foo/_delta_log/00000000000000000000.json
[2024-07-10 19:11:40 UTC]   652B STANDARD warehouse/t_foo/_delta_log/00000000000000000001.json
[2024-07-10 19:11:39 UTC]   428B STANDARD warehouse/t_foo/part-9ac74dad-4111-42d5-a287-915f1a0d3dc8-0.snappy.parquet

Reading the Delta Lake table with DuckDB

Unfortunately DuckDB Delta doesn’t seem to yet support MinIO, but if I re-run the test against a real S3 bucket we can query the Delta Lake data successfully:

🟡◗ CREATE SECRET secret_rmoff (
          TYPE S3,
          KEY_ID 'xxxxx',
          SECRET 'yyyyyy',
          REGION 'us-west-2',
          SCOPE 's3://rmoff');

🟡◗ SELECT * FROM DELTA_SCAN('s3://rmoff/delta/t_foo');
100% ▕████████████████████████████████████████████████████████████▏
┌─────────┬───────┐
│   c1    │  c2   │
│ varchar │ int32 │
├─────────┼───────┤
│ Never   │    42 │
│ Gonna   │    42 │
│ Give    │    42 │
│ You     │    42 │
│ Up      │    42 │
└─────────┴───────┘
Run Time (s): real 7.059 user 0.354942 sys 1.599596

Getting from here…to there: a Flink/Delta Lake troubleshooting detective story 🔍

Above is what works, since that’s usually what people find most useful. But what can also be useful is understanding how I got there, since it can throw up some useful pointers for others experiencing the same problems, as well as a general lesson in methodical troubleshooting.

I started from a vanilla Flink 1.18.1 environment running on my local machine (no Docker Compose, yet) with just the Flink S3 filesystem plugin installed:

mkdir ./plugins/s3-fs-hadoop && \
cp ./opt/flink-s3-fs-hadoop-1.18.1.jar ./plugins/s3-fs-hadoop/

Configuration for the S3 plugin was added to the end of the <span class="inline-code">./conf/flink-conf.yaml</span> file. Since I’m using MinIO (an S3-compatible object store) the additional <span class="inline-code">endpoint</span> and <span class="inline-code">path.style.access</span> configuration is needed.

fs.s3a.access.key: admin
fs.s3a.secret.key: password
fs.s3a.endpoint: http://localhost:9000
fs.s3a.path.style.access: true

S3 storage was provided by MinIO in a standalone Docker container:

docker run --rm --detach \
           --name minio \
           -p 9001:9001 -p 9000:9000 \
           -e "MINIO_ROOT_USER=admin" \
           -e "MINIO_ROOT_PASSWORD=password" \
           minio/minio \
           server /data --console-address ":9001"

I created a bucket called warehouse:

docker exec -it minio bash -c "mc config host add minio http://localhost:9000 admin password"
docker exec -it minio bash -c "mc mb minio/warehouse"

Then I wrote a set of very simple SQL to create and populate a Delta Lake table based on the provided documentation:

delta-flink.sql

CREATE CATALOG c_delta
    WITH ('type'         = 'delta-catalog',
          'catalog-type' = 'in-memory');

CREATE DATABASE c_delta.db_new;

CREATE TABLE c_delta.db_new.t_foo (c1 VARCHAR,
                                   c2 INT)
    WITH ('connector'  = 'delta',
          'table-path' = 's3a://warehouse/');

INSERT INTO c_delta.db_new.t_foo
    VALUES ('a',42);

Then I ran the following for each test, which bounced the Flink cluster and cleared the log files each time, and then submitted the above SQL as a set of statements to run:

rm log/*.* && ./bin/start-cluster.sh                                     && \
    ./bin/sql-client.sh -f delta-flink.sql                               && \
    ./bin/stop-cluster.sh                                                && \
    ps -ef|grep java|grep flink|awk '{print $2}'|xargs -Ifoo kill -9 foo && \
    jps

Step 1: Add Delta JARs

Based loosely on the Delta Lake connector doc I started by adding Delta JARs to my Flink <span class="inline-code">./lib</span> folder:

delta-flink-3.2.0.jar
delta-standalone_2.12-3.2.0.jar

I also set detailed logging for several components including Delta by adding this to Flink’s <span class="inline-code">log4j.properties</span> and <span class="inline-code">log4j-cli.properties</span>:

logger.fs.name = org.apache.hadoop.fs
logger.fs.level = TRACE
logger.fs2.name = org.apache.flink.fs
logger.fs2.level = TRACE
logger.aws.name = com.amazonaws
logger.aws.level = TRACE
logger.delta.name = io.delta
logger.delta.level = TRACE

Trying it out for the first time…🤞

The very first statement failed:

CREATE CATALOG c_delta WITH ( 'type' = 'delta-catalog', 'catalog-type' = 'in-memory');

[ERROR] Could not execute SQL statement. Reason:
java.lang.ClassNotFoundException: org.apache.hadoop.conf.Configuration

This highlighted the need to set my Hadoop classpath for dependencies:

export HADOOP_CLASSPATH=$(~/hadoop/hadoop-3.3.4/bin/hadoop classpath)

Bounce the stack, try again—and now creating the catalog worked, so create a database within it:

CREATE DATABASE c_delta.db_new;

Create a table:

Flink SQL> CREATE TABLE c_delta.db_new.t_foo (c1 varchar, c2 int) WITH ('connector' = 'delta',  'table-path' = 's3a://warehouse/');

[ERROR] Could not execute SQL statement. Reason:
java.lang.ClassNotFoundException: io.delta.storage.LogStore

This class is part of <span class="inline-code">delta-storage</span>, so let’s add <span class="inline-code">delta-storage-3.2.0.jar</span> to the <span class="inline-code">./lib</span> folder and try again.

(At this point I’m not going to repeat 'and then I bounced the stack'; take it as a given that after each thing I changed, I bounced the stack and reran the test, using the bash code I showed above.)

When is S3AFileSystem not S3AFileSystem?

Flink SQL> CREATE TABLE c_delta.db_new.t_foo (c1 varchar, c2 int)
WITH ('connector' = 'delta',  'table-path' = 's3a://warehouse/');

[ERROR] Could not execute SQL statement. Reason:
java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found

Time to dig into the logs. For the above failure we find an entry in the <span class="inline-code">sql-client</span> log file, I guess because the DDL (<span class="inline-code">CREATE TABLE</span>) runs directly from the SQL client, whilst DML (such as <span class="inline-code">INSERT</span>) will run on the job manager .

So for the above <span class="inline-code">CREATE TABLE</span> failure we get this log (with plenty of detail, because I increased logging to <span class="inline-code">TRACE</span> for the key components):

WARN  io.delta.flink.internal.table.HadoopUtils                    [] - Could not find Hadoop configuration via any of the supported methods (Flink configuration, environment variables).
DEBUG org.apache.hadoop.fs.FileSystem                              [] - Starting: Acquiring creator semaphore for s3a://warehouse/_delta_log
DEBUG org.apache.hadoop.fs.FileSystem                              [] - Acquiring creator semaphore for s3a://warehouse/_delta_log: duration 0:00.000s
DEBUG org.apache.hadoop.fs.FileSystem                              [] - Starting: Creating FS s3a://warehouse/_delta_log
DEBUG org.apache.hadoop.fs.FileSystem                              [] - Loading filesystems
DEBUG org.apache.hadoop.fs.FileSystem                              [] - file:// = class org.apache.hadoop.fs.LocalFileSystem from /Users/rmoff/hadoop/hadoop-3.3.4/share/hadoop/common/hadoop-common-3.3.4.jar
DEBUG org.apache.hadoop.fs.FileSystem                              [] - viewfs:// = class org.apache.hadoop.fs.viewfs.ViewFileSystem from /Users/rmoff/hadoop/hadoop-3.3.4/share/hadoop/common/hadoop-common-3.3.4.jar
DEBUG org.apache.hadoop.fs.FileSystem                              [] - har:// = class org.apache.hadoop.fs.HarFileSystem from /Users/rmoff/hadoop/hadoop-3.3.4/share/hadoop/common/hadoop-common-3.3.4.jar
DEBUG org.apache.hadoop.fs.FileSystem                              [] - http:// = class org.apache.hadoop.fs.http.HttpFileSystem from /Users/rmoff/hadoop/hadoop-3.3.4/share/hadoop/common/hadoop-common-3.3.4.jar
DEBUG org.apache.hadoop.fs.FileSystem                              [] - https:// = class org.apache.hadoop.fs.http.HttpsFileSystem from /Users/rmoff/hadoop/hadoop-3.3.4/share/hadoop/common/hadoop-common-3.3.4.jar
DEBUG org.apache.hadoop.fs.FileSystem                              [] - hdfs:// = class org.apache.hadoop.hdfs.DistributedFileSystem from /Users/rmoff/hadoop/hadoop-3.3.4/share/hadoop/hdfs/hadoop-hdfs-client-3.3.4.jar
DEBUG org.apache.hadoop.fs.FileSystem                              [] - webhdfs:// = class org.apache.hadoop.hdfs.web.WebHdfsFileSystem from /Users/rmoff/hadoop/hadoop-3.3.4/share/hadoop/hdfs/hadoop-hdfs-client-3.3.4.jar
DEBUG org.apache.hadoop.fs.FileSystem                              [] - swebhdfs:// = class org.apache.hadoop.hdfs.web.SWebHdfsFileSystem from /Users/rmoff/hadoop/hadoop-3.3.4/share/hadoop/hdfs/hadoop-hdfs-client-3.3.4.jar
DEBUG org.apache.hadoop.fs.FileSystem                              [] - Looking for FS supporting s3a
DEBUG org.apache.hadoop.fs.FileSystem                              [] - looking for configuration option fs.s3a.impl
DEBUG org.apache.hadoop.fs.FileSystem                              [] - Creating FS s3a://warehouse/_delta_log: duration 0:00.040s
ERROR org.apache.flink.table.gateway.service.operation.OperationManager [] - Failed to execute the operation 65c46b2b-a85e-4a12-90f1-c624e8eac65c.
org.apache.flink.table.api.TableException: Could not execute CreateTable in path `c_delta`.`db_new`.`t_foo`
    at org.apache.flink.table.catalog.CatalogManager.execute(CatalogManager.java:1296) ~[flink-table-api-java-uber-1.18.1.jar:1.18.1]
[…]
Caused by: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found
    at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2688) ~[hadoop-common-3.3.4.jar:?]
    at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3431) ~[hadoop-common-3.3.4.jar:?]
    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3466) ~[hadoop-common-3.3.4.jar:?]
    at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174) ~[hadoop-common-3.3.4.jar:?]
    at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574) ~[hadoop-common-3.3.4.jar:?]
    at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521) ~[hadoop-common-3.3.4.jar:?]
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540) ~[hadoop-common-3.3.4.jar:?]
    at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365) ~[hadoop-common-3.3.4.jar:?]
    at io.delta.standalone.internal.DeltaLogImpl$.apply(DeltaLogImpl.scala:260) ~[delta-standalone_2.12-3.2.0.jar:3.2.0]
    at io.delta.standalone.internal.DeltaLogImpl$.forTable(DeltaLogImpl.scala:241) ~[delta-standalone_2.12-3.2.0.jar:3.2.0]
    at io.delta.standalone.internal.DeltaLogImpl.forTable(DeltaLogImpl.scala) ~[delta-standalone_2.12-3.2.0.jar:3.2.0]
    at io.delta.standalone.DeltaLog.forTable(DeltaLog.java:164) ~[delta-standalone_2.12-3.2.0.jar:3.2.0]
    at io.delta.flink.internal.table.DeltaCatalog$1.load(DeltaCatalog.java:107) ~[delta-flink-3.2.0.jar:3.2.0]
    at io.delta.flink.internal.table.DeltaCatalog$1.load(DeltaCatalog.java:103) ~[delta-flink-3.2.0.jar:3.2.0]
    at com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3528) ~[guava-27.0-jre.jar:?]
    at com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2277) ~[guava-27.0-jre.jar:?]
    at com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2154) ~[guava-27.0-jre.jar:?]
    at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2044) ~[guava-27.0-jre.jar:?]
    at com.google.common.cache.LocalCache.get(LocalCache.java:3952) ~[guava-27.0-jre.jar:?]
    at com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974) ~[guava-27.0-jre.jar:?]
    at com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4958) ~[guava-27.0-jre.jar:?]
    at com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4964) ~[guava-27.0-jre.jar:?]
    at io.delta.flink.internal.table.DeltaCatalog.getDeltaLogFromCache(DeltaCatalog.java:390) ~[delta-flink-3.2.0.jar:3.2.0]
    at io.delta.flink.internal.table.DeltaCatalog.createTable(DeltaCatalog.java:178) ~[delta-flink-3.2.0.jar:3.2.0]
    at io.delta.flink.internal.table.CatalogProxy.createTable(CatalogProxy.java:69) ~[delta-flink-3.2.0.jar:3.2.0]
    at org.apache.flink.table.catalog.CatalogManager.lambda$createTable$18(CatalogManager.java:957) ~[flink-table-api-java-uber-1.18.1.jar:1.18.1]
    at org.apache.flink.table.catalog.CatalogManager.execute(CatalogManager.java:1290) ~[flink-table-api-java-uber-1.18.1.jar:1.18.1]
    ... 16 more

What’s really puzzling me here is that <span class="inline-code">S3AFileSystem</span> was found just fine on exactly the same deployment when I used it to write my previous article about S3 troubleshooting. All that’s changed is setting the Hadoop classpath, and adding the Delta JARs:

delta-flink-3.2.0.jar
delta-standalone_2.12-3.2.0.jar
delta-storage-3.2.0.jar

Let’s just try a write to S3 again—but not using Delta—just to make sure it still works:

Flink SQL> CREATE TABLE t_foo_fs (c1 varchar, c2 int)
           WITH (
            'connector' = 'filesystem',
            'path' = 's3a://warehouse/t_foo_fs/',
            'format' = 'json'
           );
[INFO] Execute statement succeed.

Flink SQL> INSERT INTO t_foo_fs VALUES ('a',42);
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: c19b4decafb1adc68e17c25741908a70

Flink SQL> SHOW JOBS;
+----------------------------------+-------------------------------------------------------+----------+-------------------------+
|                           job id |                                              job name |   status |              start time |
+----------------------------------+-------------------------------------------------------+----------+-------------------------+
| c19b4decafb1adc68e17c25741908a70 | insert-into_default_catalog.default_database.t_foo_fs | FINISHED | 2024-07-09T15:56:54.647 |
+----------------------------------+-------------------------------------------------------+----------+-------------------------+
1 row in set

So nothing wrong with our S3 configuration or dependencies there.

Back to Delta Lake, looking at the Delta Storage docs they say:

Delta Lake needs the <span class="inline-code">org.apache.hadoop.fs.s3a.S3AFileSystem</span> class from the <span class="inline-code">hadoop-aws</span> package

However, this class is in the existing <span class="inline-code">flink-s3-fs-hadoop-1.18.1.jar</span>:

$ jar tf flink-s3-fs-hadoop-1.18.1.jar|grep S3AFileSystem.class
org/apache/hadoop/fs/s3a/S3AFileSystem.class

But for whatever reason isn’t being used. Let’s add <span class="inline-code">hadoop-aws</span> because the docs say so, and see if it helps:

$ ls -l lib/delta
total 24152
-rw-r--r--  1 rmoff  staff    213077  9 Jul 16:22 delta-flink-3.2.0.jar
-rw-r--r--  1 rmoff  staff  11151167  9 Jul 16:21 delta-standalone_2.12-3.2.0.jar
-rw-r--r--  1 rmoff  staff     24946  9 Jul 16:54 delta-storage-3.2.0.jar
-rw-r--r--  1 rmoff  staff    962685  9 Jul 16:59 hadoop-aws-3.3.4.jar

It seems to help but we’ve now got another JAR missing:

Flink SQL> CREATE TABLE c_delta.db_new.t_foo (c1 varchar, c2 int) WITH ('connector' = 'delta',  'table-path' = 's3a://warehouse/');

[ERROR] Could not execute SQL statement. Reason:
java.lang.ClassNotFoundException: com.amazonaws.AmazonClientException

From past, bitter, experience I know that I need <span class="inline-code">aws-java-sdk-bundle</span> JAR here, so add <span class="inline-code">aws-java-sdk-bundle-1.12.648.jar</span> to the <span class="inline-code">./lib</span> folder too.

S3 Authentication from Flink with Delta Lake

Things now move on past JAR files and onto other problems:

Flink SQL> CREATE TABLE c_delta.db_new.t_foo (c1 varchar, c2 int)
WITH ('connector' = 'delta',  'table-path' = 's3a://warehouse/');

[ERROR] Could not execute SQL statement. Reason:
com.amazonaws.SdkClientException: Unable to load AWS credentials from
environment variables (AWS_ACCESS_KEY_ID (or AWS_ACCESS_KEY) and
AWS_SECRET_KEY (or AWS_SECRET_ACCESS_KEY))

Remember, in this same environment I have S3 working just fine. Credentials, end points, it’s all good. But switch to writing to S3 via the Delta connector and everything goes screwy.

Heading to the <span class="inline-code">sql-client</span> log file we can see that our existing <span class="inline-code">flink-conf.yaml</span> configuration for S3 just isn’t being used, as evidenced by the lack of a custom S3 endpoint being recognised (see my previous post for more detail):

DEBUG org.apache.hadoop.fs.s3a.DefaultS3ClientFactory              [] - Creating endpoint configuration for ""
DEBUG org.apache.hadoop.fs.s3a.DefaultS3ClientFactory              [] - Using default endpoint -no need to generate a configuration

This is despite the configuration being logged as present earlier in the log:

INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: fs.s3a.path.style.access, true
INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: fs.s3a.access.key, admin
INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: fs.s3a.secret.key, ******
INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: fs.s3a.endpoint, http://localhost:9000

And thus, the <span class="inline-code">SimpleAWSCredentialsProvider</span> reports <span class="inline-code">No AWS credentials in the Hadoop configuration…</span>

DEBUG org.apache.hadoop.fs.s3a.AWSCredentialProviderList           [] - No credentials from SimpleAWSCredentialsProvider: org.apache.hadoop.fs.s3a.auth.NoAwsCredentialsException: SimpleAWSCredentialsProvider: No AWS credentials in the Hadoop configuration

…leaving authentication to failback to the <span class="inline-code">EnvironmentVariableCredentialsProvider</span> (which also doesn’t work because I’ve not set any credentials in the environment variables):

DEBUG org.apache.hadoop.fs.s3a.AWSCredentialProviderList           [] - No credentials provided by EnvironmentVariableCredentialsProvider: com.amazonaws.SdkClientException: Unable to load AWS credentials from environment variables (AWS_ACCESS_KEY_ID (or AWS_ACCESS_KEY) and AWS_SECRET_KEY (or AWS_SECRET_ACCESS_KEY))

There is a <span class="inline-code">WARN</span> log entry which might be relevant:

WARN  io.delta.flink.internal.table.HadoopUtils                    [] - Could not find Hadoop configuration via any of the supported methods (Flink configuration, environment variables).

This also then ties with the <span class="inline-code">No AWS credentials in the Hadoop configuration</span> message from <span class="inline-code">SimpleAWSCredentialsProvider</span>.

Looking at <span class="inline-code">io.delta.flink.internal.table.HadoopUtils</span> shows an interesting message:

This class was backported from Flink’s flink-hadoop-fs module, and it contains a subset of methods comparing to the original class. We kept only needed methods.

What’s more, it includes this:

FLINK_CONFIG_PREFIXES = {"flink.hadoop."};

I talked about <span class="inline-code">FLINK_CONFIG_PREFIXES</span> in my previous Flink S3 troubleshooting post. Taking a bit of a punt, let’s change our <span class="inline-code">flink-conf.yaml</span> and add this <span class="inline-code">flink.hadoop.</span> prefix:

flink.hadoop.fs.s3a.access.key: admin
flink.hadoop.fs.s3a.secret.key: password
flink.hadoop.fs.s3a.endpoint: http://localhost:9000
flink.hadoop.fs.s3a.path.style.access: true

After this, a different error!

But before we get to that, let’s just record for posterity what we’re now seeing in the log file:

INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: flink.hadoop.fs.s3a.access.key, admin
INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: flink.hadoop.fs.s3a.secret.key, ******
INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: flink.hadoop.fs.s3a.endpoint, http://localhost:9000
INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: flink.hadoop.fs.s3a.path.style.access, true
[…]
DEBUG io.delta.flink.internal.table.HadoopUtils                    [] - Adding Flink config entry for flink.hadoop.fs.s3a.secret.key as fs.s3a.secret.key=password to Hadoop config
DEBUG io.delta.flink.internal.table.HadoopUtils                    [] - Adding Flink config entry for flink.hadoop.fs.s3a.access.key as fs.s3a.access.key=admin to Hadoop config
DEBUG io.delta.flink.internal.table.HadoopUtils                    [] - Adding Flink config entry for flink.hadoop.fs.s3a.path.style.access as fs.s3a.path.style.access=true to Hadoop config
DEBUG io.delta.flink.internal.table.HadoopUtils                    [] - Adding Flink config entry for flink.hadoop.fs.s3a.endpoint as fs.s3a.endpoint=http://localhost:9000 to Hadoop config
[…]
DEBUG org.apache.hadoop.fs.s3a.AWSCredentialProviderList           [] - Using credentials from SimpleAWSCredentialsProvider

So similar to the previous (but different 😡) S3 configuration handling, what’s needed in the Flink configuration file isn’t what the <span class="inline-code">S3AFileSystem</span> module is using (or logging), which is all <span class="inline-code">fs.s3a.</span> prefixed.

More JARs Please

Anyway, back to our next error:

Flink SQL> CREATE TABLE c_delta.db_new.t_foo (c1 varchar, c2 int) WITH ('connector' = 'delta',  'table-path' = 's3a://warehouse/');

[ERROR] Could not execute SQL statement. Reason:
java.lang.ClassNotFoundException: shapeless.Generic

Google wasn’t much help on this one, but I eventually found this line in the Delta build script which references <span class="inline-code">com.chuusai</span> which pointed me to shapeless_2.12-2.3.4.jar on Maven. Adding this to <span class="inline-code">./lib</span> did the trick!

Success! (almost)

Flink SQL> CREATE TABLE c_delta.db_new.t_foo (c1 varchar, c2 int)
WITH ('connector' = 'delta',  'table-path' = 's3a://warehouse/');

[INFO] Execute statement succeed.

On MinIO there’s a Delta log file:

$ mc ls --recursive minio
[2024-07-09 16:32:16 UTC]   776B STANDARD warehouse/_delta_log/00000000000000000000.json

$ mc cat minio/warehouse/_delta_log/00000000000000000000.json

The contents of this looks like this:

{
    "commitInfo": {
        "timestamp": 1720542736002,
        "operation": "CREATE TABLE",
        "operationParameters": {
            "partitionBy": "[]",
            "description": null,
            "properties": "{}",
            "isManaged": false
        },
        "isolationLevel": "SnapshotIsolation",
        "isBlindAppend": true,
        "operationMetrics": {},
        "engineInfo": "flink-engine/1.16.1-flink-delta-connector/3.2.0Delta-Standalone/3.2.0"
    }
}
{
    "protocol": {
        "minReaderVersion": 1,
        "minWriterVersion": 2
    }
}
{
    "metaData": {
        "id": "e2f4dcb8-d4c7-4456-b758-c140a58bc190",
        "name": "t_foo",
        "format": {
            "provider": "parquet",
            "options": {}
        },
        "schemaString": "{\"type\":\"struct\",\"fields\":[{\"name\":\"c1\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"c2\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}",
        "partitionColumns": [],
        "configuration": {},
        "createdTime": 1720542735715
    }
}

Even More JARs Please

Now can we write data to our new Delta Lake table?

Flink SQL> INSERT INTO c_delta.db_new.t_foo VALUES ('a',42);

[ERROR] Could not execute SQL statement. Reason:
java.lang.ClassNotFoundException: org.apache.flink.formats.parquet.row.ParquetRowDataBuilder

Nope. At least, not yet. Let’s chuck the <span class="inline-code">flink-sql-parquet</span> JAR into <span class="inline-code">./lib</span> and see if that helps.

It does help! It’s been submitted!

[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 1dbe63859fa78caf50bc93ed75861907

…and it’s failed:

Flink SQL> SHOW JOBS;
+----------------------------------+----------------------------------+--------+-------------------------+
|                           job id |                         job name | status |              start time |
+----------------------------------+----------------------------------+--------+-------------------------+
| 1dbe63859fa78caf50bc93ed75861907 | insert-into_c_delta.db_new.t_foo | FAILED | 2024-07-09T16:36:07.096 |
+----------------------------------+----------------------------------+--------+-------------------------+
1 row in set

S3 Authentication (again!)

Looking at the <span class="inline-code">taskexecutor</span> log file from the Flink job manager we can see a very familiar error:

org.apache.flink.runtime.taskmanager.Task                    [] - t_foo[2]: Writer -> t_foo[2]: Committer (1/1)#0
(bd4e4c0cb7f1772d6bdfdf702755dc6f_20ba6b65f97481d5570070de90e4e791_0_0) switched from RUNNING to FAILED with failure cause:
java.nio.file.AccessDeniedException: part-d2ef12d1-300c-4d3b-b5ce-72fdcc359bb7-0.snappy.parquet:
org.apache.hadoop.fs.s3a.auth.NoAuthWithAWSException:
No AWS Credentials provided by DynamicTemporaryAWSCredentialsProvider TemporaryAWSCredentialsProvider SimpleAWSCredentialsProvider EnvironmentVariableCredentialsProvider IAMInstanceCredentialsProvider :
com.amazonaws.SdkClientException: Unable to load AWS credentials from environment variables (AWS_ACCESS_KEY_ID (or AWS_ACCESS_KEY) and AWS_SECRET_KEY (or AWS_SECRET_ACCESS_KEY))

Now this is weird, because we have successfully written to S3 (MinIO) when we created the table, in the very same session. It’s there, in black and white (or whatever l33t colour schema you have your terminal set to):

$ mc ls --recursive minio
[2024-07-09 16:32:16 UTC]   776B STANDARD warehouse/_delta_log/00000000000000000000.json

Looking through the taskexecutor log file in more detail I can also see successful interaction between the S3 client and MinIO requesting:

  1. <span class="inline-code">s3a://warehouse/_delta_log/_last_checkpoint</span> (which doesn’t exist, since we’ve only just created the table)
  2. <span class="inline-code">s3a://warehouse/_delta_log</span> (which doesn’t exist; it’s a 'folder', not an object)
  3. <span class="inline-code">s3a://warehouse</span> (which exists with an object below it <span class="inline-code">_delta_log/00000000000000000000.json</span>)
  4. etc.
DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem                       [] - Getting path status for s3a://warehouse/_delta_log/_last_checkpoint  (_delta_log/_last_checkpoint); needEmptyDirectory=false
DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem                       [] - S3GetFileStatus s3a://warehouse/_delta_log/_last_checkpoint
DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem                       [] - HEAD _delta_log/_last_checkpoint with change tracker null
DEBUG com.amazonaws.request                                        [] - Sending Request: HEAD http://localhost:9000 /warehouse/_delta_log/_last_checkpoint
DEBUG com.amazonaws.request                                        [] - Received error response: com.amazonaws.services.s3.model.AmazonS3Exception: Not Found (Service: Amazon S3; Status Code: 404; Error Code: 404 Not Found; Request ID: 17E0D513A0355CEF; S3 Extended Request ID: dd9025
DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem                       [] - Not Found: s3a://warehouse/_delta_log/_last_checkpoint
DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem                       [] - Getting path status for s3a://warehouse/_delta_log  (_delta_log); needEmptyDirectory=false
DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem                       [] - S3GetFileStatus s3a://warehouse/_delta_log
DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem                       [] - HEAD _delta_log with change tracker null
DEBUG com.amazonaws.request                                        [] - Sending Request: HEAD http://localhost:9000 /warehouse/_delta_log
DEBUG com.amazonaws.request                                        [] - Received error response: com.amazonaws.services.s3.model.AmazonS3Exception: Not Found (Service: Amazon S3; Status Code: 404; Error Code: 404 Not Found; Request ID: 17E0D513A1419BC2; S3 Extended Request ID: dd9025
DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem                       [] - LIST List warehouse:/_delta_log/ delimiter=/ keys=2 requester pays=false
DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem                       [] - Starting: LIST
DEBUG com.amazonaws.request                                        [] - Sending Request: GET http://localhost:9000 /warehouse/
DEBUG com.amazonaws.request                                        [] - Received successful response: 200, AWS Request ID: 17E0D513A1C2EDE8
DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem                       [] - LIST: duration 0:00.081s
DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem                       [] - Found path as directory (with /)
DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem                       [] - Prefix count = 0; object count=1
DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem                       [] - Summary: _delta_log/00000000000000000000.json 776
DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem                       [] - List status for path: s3a://warehouse/_delta_log
DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem                       [] - listStatus: doing listObjects for directory _delta_log/
DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem                       [] - LIST List warehouse:/_delta_log/ delimiter=/ keys=5000 requester pays=false
DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem                       [] - Starting: LIST
DEBUG com.amazonaws.request                                        [] - Sending Request: GET http://localhost:9000 /warehouse/
DEBUG com.amazonaws.request                                        [] - Received successful response: 200, AWS Request ID: 17E0D513A706C033
DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem                       [] - LIST: duration 0:00.008s
DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem                       [] - s3a://warehouse/_delta_log/00000000000000000000.json: _delta_log/00000000000000000000.json size=776
DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem                       [] - Adding: S3AFileStatus{path=s3a://warehouse/_delta_log/00000000000000000000.json; isDirectory=false; length=776; replication=1; blocksize=33554432; modification_time=1720542736275; access_time=0; owner=rmoff; grou
DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem                       [] - Added 1 entries; ignored 0; hasNext=true; hasMoreObjects=false

So S3 client and MinIO are clearly co-operating. But writing the actual data file (<span class="inline-code">part-d2ef12d1-300c-4d3b-b5ce-72fdcc359bb7-0.snappy.parquet</span>) fails. Searching in the log for the file name shows that the upload appears before the S3 interactions in the log that I’ve noted above:

INFO  org.apache.flink.runtime.taskmanager.Task                    [] - t_foo[2]: Writer -> t_foo[2]: Committer (1/1)#0 (bd4e4c0cb7f1772d6bdfdf702755dc6f_20ba6b65f97481d5570070de90e4e791_0_0) switched from INITIALIZING to RUNNING.
DEBUG io.delta.flink.sink.internal.writer.DeltaWriterBucket        [] - Opening new part file for bucket id= due to element org.apache.flink.table.data.binary.BinaryRowData@acbf1a5e.
DEBUG io.delta.flink.sink.internal.writer.DeltaWriterBucket        [] - Opening new part file "part-d2ef12d1-300c-4d3b-b5ce-72fdcc359bb7-0.snappy.parquet" for bucket id=.
DEBUG org.apache.hadoop.fs.s3a.WriteOperationHelper                [] - Initiating Multipart upload to part-d2ef12d1-300c-4d3b-b5ce-72fdcc359bb7-0.snappy.parquet
DEBUG org.apache.hadoop.fs.s3a.Invoker                             [] - Starting: initiate MultiPartUpload
DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem                       [] - Initiate multipart upload to part-d2ef12d1-300c-4d3b-b5ce-72fdcc359bb7-0.snappy.parquet

Just underneath this bit is a log message which is concerning, knowing what I know now about successful S3 log messages: <span class="inline-code">SimpleAWSCredentialsProvider: No AWS credentials in the Hadoop configuration</span>

DEBUG org.apache.hadoop.fs.s3a.AWSCredentialProviderList           [] - No credentials from SimpleAWSCredentialsProvider: org.apache.hadoop.fs.s3a.auth.NoAwsCredentialsException: SimpleAWSCredentialsProvider: No AWS credentials in the Hadoop configuration

BUT just a few milliseconds later:

DEBUG org.apache.hadoop.fs.s3a.AWSCredentialProviderList           [] - Using credentials from SimpleAWSCredentialsProvider

Something is playing silly buggers here 🤔. Looking in the log further I can see similar conflicting messages, here about the endpoint configuration:

DEBUG org.apache.hadoop.fs.s3a.DefaultS3ClientFactory              [] - Creating endpoint configuration for ""
DEBUG org.apache.hadoop.fs.s3a.DefaultS3ClientFactory              [] - Using default endpoint -no need to generate a configuration
DEBUG org.apache.hadoop.fs.s3a.DefaultS3ClientFactory              [] - fs.s3a.endpoint.region="us-east-1"
DEBUG org.apache.hadoop.fs.s3a.DefaultS3ClientFactory              [] - Using default endpoint; setting region to us-east-1

DEBUG org.apache.hadoop.fs.s3a.DefaultS3ClientFactory              [] - Creating endpoint configuration for "http://localhost:9000"
DEBUG org.apache.hadoop.fs.s3a.DefaultS3ClientFactory              [] - Endpoint URI = http://localhost:9000
DEBUG org.apache.hadoop.fs.s3a.DefaultS3ClientFactory              [] - Endpoint http://localhost:9000 is not the default; parsing
DEBUG org.apache.hadoop.fs.s3a.DefaultS3ClientFactory              [] - Region for endpoint http://localhost:9000, URI http://localhost:9000 is determined as null

It seems we have a Jekyll and Hyde scenario 😇 👿, with one playing nicely and using the config, and one doing its damndest to ignore it 🤪.

Let’s try something random. Let’s add the original format of S3 config back into <span class="inline-code">flink-conf.yaml</span>, so it’s duplicated:

flink.hadoop.fs.s3a.access.key: admin
flink.hadoop.fs.s3a.secret.key: password
flink.hadoop.fs.s3a.endpoint: http://localhost:9000
flink.hadoop.fs.s3a.path.style.access: true

fs.s3a.access.key: admin
fs.s3a.secret.key: password
fs.s3a.endpoint: http://localhost:9000
fs.s3a.path.style.access: true

Reader, it worked.

Flink SQL> CREATE CATALOG c_delta WITH ( 'type' = 'delta-catalog', 'catalog-type' = 'in-memory');
[INFO] Execute statement succeed.

Flink SQL> CREATE DATABASE c_delta.db_new;
[INFO] Execute statement succeed.

Flink SQL> INSERT INTO c_delta.db_new.t_foo VALUES ('a',42);
[INFO] Execute statement succeed.

Flink SQL> SHOW JOBS;
+----------------------------------+----------------------------------+----------+-------------------------+
|                           job id |                         job name |   status |              start time |
+----------------------------------+----------------------------------+----------+-------------------------+
| 8333654c32374acff55c5bfa800738f7 | insert-into_c_delta.db_new.t_foo | FINISHED | 2024-07-10T11:58:49.687 |
+----------------------------------+----------------------------------+----------+-------------------------+
1 row in set

Flink SQL> SELECT * FROM c_delta.db_new.t_foo;
+----+--------------------------------+-------------+
| op |                             c1 |          c2 |
+----+--------------------------------+-------------+
| +I |                              a |          42 |
+----+--------------------------------+-------------+
Received a total of 1 rows

Troubleshooting: Will the real S3AFileSystem please stand up?

Since we’re into the realms of trying random shit, let’s think about the <span class="inline-code">S3AFileSystem</span> dependency. If we’re just writing vanilla JSON (for example) from Flink, we add the <span class="inline-code">flink-s3-fs-hadoop</span> JAR. But as we saw above, that wasn’t enough for Delta Lake, and we had to add <span class="inline-code">hadoop-aws</span>. In our working environment we therefore have both present. Perhaps one is doing some of the metadata work, whilst the other is handling the data writes themselves.

On that basis, let’s get rid of <span class="inline-code">flink-s3-fs-hadoop</span> temporarily, and test this hypothesis. Since the <span class="inline-code">CREATE TABLE</span> above failed without <span class="inline-code">hadoop-aws</span>, the hypothesis is that <span class="inline-code">flink-s3-fs-hadoop</span> is used for writing the data. If we remove it, the <span class="inline-code">CREATE TABLE</span> should work, and the <span class="inline-code">INSERT INTO</span> fail.

$ rm -r plugins/s3-fs-hadoop

Now we restart the Flink cluster, and try our test SQL statements again:

Flink SQL> CREATE CATALOG c_delta WITH ( 'type' = 'delta-catalog', 'catalog-type' = 'in-memory')
[INFO] Execute statement succeed.

Flink SQL> CREATE TABLE c_delta.db_new.t_foo (c1 varchar, c2 int)  WITH (  'connector' = 'delta',  'table-path' = 's3a://warehouse/')
[INFO] Execute statement succeed.

Flink SQL> INSERT INTO c_delta.db_new.t_foo VALUES ('a',42)
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 's3a'. The scheme is directly supported by Flink through the following plugin(s): flink-s3-fs-hadoop. Please ensure that each plugin resides within its own subfolder within the plugins directory. See https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/filesystems/plugins/ for more information. If you want to use a Hadoop file system for that scheme, please add the scheme to the configuration fs.allowed-fallback-filesystems. For a full list of supported file systems, please see https://nightlies.apache.org/flink/flink-docs-stable/ops/filesystems/.

There we go—it’s a double act show, with <span class="inline-code">hadoop-aws</span> and <span class="inline-code">flink-s3-fs-hadoop</span> being responsible for writing to S3, but using different configuration parameters.

Finding the smoking gun

Having a bit of an itch to scratch here I wanted to find out more about the conflicting log messages. After a perusal of the log4j PatternLayout codes I amended the log4j.properties to use:

appender.main.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x tid:%tid [%-60threadName] - %m %n

Which then turned up the following—note the different <span class="inline-code">tid</span> values:

DEBUG org.apache.hadoop.fs.s3a.DefaultS3ClientFactory              [] tid:79 [t_foo[2]: Writer -> t_foo[2]: Committer (1/1)#0             ] - Creating endpoint configuration for ""
DEBUG org.apache.hadoop.fs.s3a.DefaultS3ClientFactory              [] tid:79 [t_foo[2]: Writer -> t_foo[2]: Committer (1/1)#0             ] - Using default endpoint -no need to generate a configuration
DEBUG org.apache.hadoop.fs.s3a.DefaultS3ClientFactory              [] tid:79 [t_foo[2]: Writer -> t_foo[2]: Committer (1/1)#0             ] - fs.s3a.endpoint.region="us-east-1"
DEBUG org.apache.hadoop.fs.s3a.DefaultS3ClientFactory              [] tid:79 [t_foo[2]: Writer -> t_foo[2]: Committer (1/1)#0             ] - Using default endpoint; setting region to us-east-1

DEBUG org.apache.hadoop.fs.s3a.DefaultS3ClientFactory              [] tid:80 [t_foo[2]: Global Committer (1/1)#0                          ] - Creating endpoint configuration for "http://localhost:9000"
DEBUG org.apache.hadoop.fs.s3a.DefaultS3ClientFactory              [] tid:80 [t_foo[2]: Global Committer (1/1)#0                          ] - Endpoint URI = http://localhost:9000
DEBUG org.apache.hadoop.fs.s3a.DefaultS3ClientFactory              [] tid:80 [t_foo[2]: Global Committer (1/1)#0                          ] - Endpoint http://localhost:9000 is not the default; parsing
DEBUG org.apache.hadoop.fs.s3a.DefaultS3ClientFactory              [] tid:80 [t_foo[2]: Global Committer (1/1)#0                          ] - Region for endpoint http://localhost:9000, URI http://localhost:9000 is determined as null

DEBUG org.apache.hadoop.fs.s3a.AWSCredentialProviderList           [] tid:79 [t_foo[2]: Writer -> t_foo[2]: Committer (1/1)#0             ] - No credentials from SimpleAWSCredentialsProvider: org.apache.hadoop.fs.s3a.auth.NoAwsCredentialsException: SimpleAWSCredentialsProvider: No AWS credentials in the Hadoop configuration

DEBUG org.apache.hadoop.fs.s3a.AWSCredentialProviderList           [] tid:80 [t_foo[2]: Global Committer (1/1)#0                          ] - Using credentials from SimpleAWSCredentialsProvider

So there are two threads: a <span class="inline-code">Global Committer</span> and a <span class="inline-code">Writer</span> (for table <span class="inline-code">t_foo</span>). As we saw above, writes to the table (<span class="inline-code">Writer</span> thread) seem to be done using <span class="inline-code">S3AFileSystem</span> from <span class="inline-code">flink-s3-fs-hadoop</span>, whilst those for the metadata (<span class="inline-code">Global Committer</span>) using <span class="inline-code">S3AFileSystem</span> from <span class="inline-code">hadoop-aws</span>.

📫 Email signup 👇

Did you enjoy this issue of Checkpoint Chronicle? Would you like the next edition delivered directly to your email to read from the comfort of your own home?

Simply enter your email address here and we'll send you the next issue as soon as it's published—and nothing else, we promise!

Robin Moffatt

Robin is a Principal DevEx Engineer at Decodable. He has been speaking at conferences since 2009 including QCon, Devoxx, Strata, Kafka Summit, and Øredev. You can find many of his talks online and his articles on the Decodable blog as well as his own blog.

Outside of work, Robin enjoys running, drinking good beer, and eating fried breakfasts—although generally not at the same time.