Back
August 6, 2024
7
min read

Troubleshooting Flink SQL S3 problems

By
Robin Moffatt
Share this post

You’d think once was enough. Having already written about the trouble that I had getting Flink SQL to write to S3 (including on MinIO) this should now be a moot issue for me. Right? RIGHT?!

giphy

Well perhaps it is. I’m not yet sure. But what’s led me down this path again is trying to get the Delta Lake connector to work in Flink SQL, and butting up against S3 problems. In troubleshooting those I ended up back at trying to understand in more detail what a functioning S3 configuration looks like so that I could rule out issues other than the Delta Lake connector.

For Delta Lake and Flink itself you’ll need to stay tuned to the blog, as I write about that in a separate article. Today we’re just looking at some troubleshooting tips for S3 access in Flink SQL in general.

So what we’ve got here is a methodical explanation of how I understand the S3 configuration and libraries to work, keeping things as vanilla as possible to start with. Filesystem connector, JSON format.

Huge caveat: I am just a humble end-user of this stuff. I don’t code Java. I don’t actually know what I’m doing half the time 😉.

Test rig

I’m using Flink 1.18.1, with MinIO providing S3-compatible storage, running locally in a Docker container and available on port 9000.

This is my test statement:

CREATE TABLE t_foo_fs (c1 varchar, c2 int)
     WITH (
      'connector' = 'filesystem',
      'path' = 's3a://warehouse/t_foo_fs/',
      'format' = 'json'
     );

INSERT INTO t_foo_fs VALUES ('a',42);

I’m using json format not because it’s a good idea, but because it doesn’t need any further dependencies installed, and I want to keep this as small and easily-reproduced as possible.

I’m using the default ephemeral in-memory catalog, so each time I restart my session I have to define the table again.

Once the table is created I run the <span class="inline-code">INSERT</span>, which the SQL Client passes as a job to the Job Manager to run (<span class="inline-code">Submitting SQL update statement to the cluster…​</span>). This means that the SQL Client will say <span class="inline-code">successfully</span> in the message after running the <span class="inline-code">INSERT</span> but if you look carefully it’s just saying it’s submitted successfully:

[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 34f3b66a1a6f89bea83145af8289aca5

Thus, you have to then go and check if the job itself actually worked.

To make this reproducible I ran the following:

rm log/*.* &&
  ./bin/start-cluster.sh &&
  ./bin/sql-client.sh &&
  ./bin/stop-cluster.sh &&
  ps -ef|grep java|grep flink|awk '{print $2}'│xargs -Ifoo kill -9 foo &&
  jps

This clears out the log directory, starts Flink and the SQL Client, and then waits for the client to exit. Here I run the <span class="inline-code">CREATE TABLE</span> and <span class="inline-code">INSERT</span>, and then exit the client. This then causes the remainder of the statements to run, which shuts down the Flink cluster, kills any hanging processes, and runs <span class="inline-code">jps</span> to confirm this.

Logging

Getting an insight into what’s going is a two-fold process:

  1. What is running where (across SQL Client, Job Manager, and other components such as catalog metastores)
  2. Logs!

To enable logging on Flink when running it as a local binary started with <span class="inline-code">./start-cluster.sh</span> with SQL Client change the two files:

  1. <span class="inline-code">./conf/log4j.properties</span>
  2. <span class="inline-code">./conf/log4j-cli.properties</span>

To both of these I added: (Ref)

logger.fs.name = org.apache.hadoop.fs
logger.fs.level = TRACE
logger.fs2.name = org.apache.flink.fs
logger.fs2.level = TRACE
logger.aws.name = software.amazon
logger.aws.level = TRACE

Dependency: Hadoop S3 plugin

If you don’t include this and try to write to an <span class="inline-code">s3://</span> path you’ll get this error:

org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 's3'.
The scheme is directly supported by Flink through the following plugin(s): flink-s3-fs-hadoop, flink-s3-fs-presto.
Please ensure that each plugin resides within its own subfolder within the plugins directory. See https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/filesystems/plugins/ for more information.
If you want to use a Hadoop file system for that scheme, please add the scheme to the configuration fs.allowed-fallback-filesystems.
For a full list of supported file systems, please see https://nightlies.apache.org/flink/flink-docs-stable/ops/filesystems/.

For an <span class="inline-code">s3a://</span> path you’ll get basically the same:

org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 's3a'.
The scheme is directly supported by Flink through the following plugin(s): flink-s3-fs-hadoop.
[…]

So per the docs, you need the Hadoop S3 plugin, which ships with Flink but isn’t in place by default.

mkdir ./plugins/s3-fs-hadoop && \
cp ./opt/flink-s3-fs-hadoop-1.18.1.jar ./plugins/s3-fs-hadoop/

Log File Diving

With the log levels set to <span class="inline-code">TRACE</span>, let’s go and have a look at what we get. Here's the output from the <span class="inline-code">taskexecutor</span> log (in the Flink <span class="inline-code">./log</span> folder).

The first entry we have is the S3 filesystem libraries starting. <span class="inline-code">org.apache.flink.fs.s3</span> handles the s3 schema and hands this off to <span class="inline-code">org.apache.hadoop.fs.s3a.S3AFileSystem</span> which does most of the rest of the work.

The Hadoop <span class="inline-code">S3AFileSystem</span> is shown as <span class="inline-code">Initializing</span> for <span class="inline-code">warehouse</span>, where <span class="inline-code">warehouse</span> is the name of the bucket that’s been specified for writing the table data to (<span class="inline-code">'path' = 's3a://warehouse/t_foo_fs/'</span>).

DEBUG org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory    [] - Creating S3 file system backed by Hadoop s3a file system
DEBUG org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory    [] - Loading Hadoop configuration for Hadoop s3a file system
DEBUG org.apache.flink.fs.s3hadoop.S3FileSystemFactory             [] - Using scheme s3a://warehouse/t_foo_fs for s3a file system backing the S3 File System
DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem                       [] - Initializing S3AFileSystem for warehouse

Next up is a bunch of entries covering config values etc. One point to note is <span class="inline-code">Propagating entries under</span> which ties into the idea of per-bucket configuration.

DEBUG org.apache.hadoop.fs.s3a.S3AUtils                            [] - Propagating entries under fs.s3a.bucket.warehouse.
DEBUG org.apache.hadoop.fs.s3a.S3AUtils                            [] - Data is unencrypted
DEBUG org.apache.hadoop.fs.s3a.S3ARetryPolicy                      [] - Retrying on recoverable AWS failures 7 times with an initial interval of 500ms
INFO  org.apache.hadoop.metrics2.impl.MetricsConfig                [] - Loaded properties from hadoop-metrics2.properties
INFO  org.apache.hadoop.metrics2.impl.MetricsSystemImpl            [] - Scheduled Metric snapshot period at 10 second(s).
INFO  org.apache.hadoop.metrics2.impl.MetricsSystemImpl            [] - s3a-file-system metrics system started
DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem                       [] - Client Side Encryption enabled: false
WARN  org.apache.hadoop.util.NativeCodeLoader                      [] - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
DEBUG org.apache.hadoop.fs.s3a.S3ARetryPolicy                      [] - Retrying on recoverable AWS failures 7 times with an initial interval of 500ms
DEBUG org.apache.hadoop.fs.s3a.S3GuardExistsRetryPolicy            [] - Retrying on recoverable S3Guard table/S3 inconsistencies 7 times with an initial interval of 2000ms
DEBUG org.apache.hadoop.fs.s3a.S3AUtils                            [] - Value of fs.s3a.paging.maximum is 5000
DEBUG org.apache.hadoop.fs.s3a.S3AUtils                            [] - Value of fs.s3a.block.size is 33554432
DEBUG org.apache.hadoop.fs.s3a.S3AUtils                            [] - Value of fs.s3a.readahead.range is 65536
DEBUG org.apache.hadoop.fs.s3a.S3AUtils                            [] - Value of fs.s3a.max.total.tasks is 32
DEBUG org.apache.hadoop.fs.s3a.S3AUtils                            [] - Value of fs.s3a.threads.keepalivetime is 60
DEBUG org.apache.hadoop.fs.s3a.S3AUtils                            [] - Value of fs.s3a.executor.capacity is 16
DEBUG org.apache.hadoop.fs.s3a.auth.SignerManager                  [] - No custom signers specified
DEBUG org.apache.hadoop.fs.s3a.audit.AuditIntegration              [] - auditing is disabled
DEBUG org.apache.hadoop.fs.s3a.audit.AuditIntegration              [] - Started Audit Manager Service NoopAuditManagerS3A in state NoopAuditManagerS3A: STARTED
DEBUG org.apache.hadoop.fs.s3a.S3AUtils                            [] - Value of fs.s3a.internal.upload.part.count.limit is 10000
DEBUG org.apache.hadoop.fs.s3a.S3ARetryPolicy                      [] - Retrying on recoverable AWS failures 7 times with an initial interval of 500ms
DEBUG org.apache.hadoop.fs.s3a.S3AUtils                            [] - Credential provider class is org.apache.flink.fs.s3.common.token.DynamicTemporaryAWSCredentialsProvider
DEBUG org.apache.hadoop.fs.s3a.S3AUtils                            [] - Credential provider class is org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider
DEBUG org.apache.hadoop.fs.s3a.S3AUtils                            [] - Credential provider class is org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider
DEBUG org.apache.hadoop.fs.s3a.S3AUtils                            [] - Credential provider class is com.amazonaws.auth.EnvironmentVariableCredentialsProvider
DEBUG org.apache.hadoop.fs.s3a.S3AUtils                            [] - Credential provider class is org.apache.hadoop.fs.s3a.auth.IAMInstanceCredentialsProvider
DEBUG org.apache.hadoop.fs.s3a.S3AUtils                            [] - For URI s3a://warehouse/t_foo_fs, using credentials AWSCredentialProviderList[refcount= 1: [org.apache.flink.fs.s3.common.token.DynamicTemporaryAWSCredentialsProvider@1376bee, TemporaryAWSCredentialsProvider, SimpleAWSCredentialsProvider, EnvironmentVariableCredentialsProvider, org.apache.hadoop.fs.s3a.auth.IAMInstanceCredentialsProvider@4f2b477b]
DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem                       [] - Using credential provider AWSCredentialProviderList[refcount= 1: [org.apache.flink.fs.s3.common.token.DynamicTemporaryAWSCredentialsProvider@1376bee, TemporaryAWSCredentialsProvider, SimpleAWSCredentialsProvider, EnvironmentVariableCredentialsProvider, org.apache.hadoop.fs.s3a.auth.IAMInstanceCredentialsProvider@4f2b477b]
DEBUG org.apache.hadoop.fs.s3a.S3AUtils                            [] - Value of fs.s3a.connection.maximum is 96

Next is another <span class="inline-code">DEBUG</span> entry but with something that looks like an error:

DEBUG org.apache.hadoop.fs.s3a.impl.NetworkBinding                 [] - Unable to create class org.apache.hadoop.fs.s3a.impl.ConfigureShadedAWSSocketFactory, value of fs.s3a.ssl.channel.mode will be ignored
java.lang.NoClassDefFoundError: com/amazonaws/thirdparty/apache/http/conn/socket/ConnectionSocketFactory
    at java.lang.Class.forName0(Native Method) ~[?:?]
    at java.lang.Class.forName(Class.java:315) ~[?:?]
    at org.apache.hadoop.fs.s3a.impl.NetworkBinding.bindSSLChannelMode(NetworkBinding.java:89) ~[flink-s3-fs-hadoop-1.18.1.jar:1.18.1]
    at org.apache.hadoop.fs.s3a.S3AUtils.initProtocolSettings(S3AUtils.java:1347) ~[flink-s3-fs-hadoop-1.18.1.jar:1.18.1]
[…]
Caused by: java.lang.ClassNotFoundException: com.amazonaws.thirdparty.apache.http.conn.socket.ConnectionSocketFactory
[…]

I guess this is actually just some internal stuff, since it’s not raised as an error, so we’ll ignore it for now.

Then some more config values, and then an interesting one (since we’re using MinIO)—the endpoint configuration. Since we’ve not yet set it, it’s unsurprising to see that it’s using the default (<span class="inline-code">Using default endpoint</span>):

DEBUG org.apache.hadoop.fs.s3a.DefaultS3ClientFactory              [] - Creating endpoint configuration for ""
DEBUG org.apache.hadoop.fs.s3a.DefaultS3ClientFactory              [] - Using default endpoint -no need to generate a configuration
DEBUG org.apache.hadoop.fs.s3a.DefaultS3ClientFactory              [] - fs.s3a.endpoint.region="us-east-1"
DEBUG org.apache.hadoop.fs.s3a.DefaultS3ClientFactory              [] - Using default endpoint; setting region to us-east-1

I’ll skip over a bit more of the same kind of background <span class="inline-code">DEBUG</span> stuff, and highlight this bit where we start to see the file paths mentioned.

DEBUG org.apache.hadoop.fs.s3a.WriteOperationHelper                [] - Initiating Multipart upload to t_foo_fs/part-5d498f53-ec06-4ee4-9fe2-5c7763755200-0-0
DEBUG org.apache.hadoop.fs.s3a.Invoker                             [] - Starting: initiate MultiPartUpload
DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem                       [] - Initiate multipart upload to t_foo_fs/part-5d498f53-ec06-4ee4-9fe2-5c7763755200-0-0
DEBUG org.apache.hadoop.fs.statistics.impl.IOStatisticsStoreImpl   [] - Incrementing counter object_multipart_initiated by 1 with final value 1

Remembering the value of <span class="inline-code">path ('path' = 's3a://warehouse/t_foo_fs/')</span> we can see the bucket has been stripped away to give us just the 'folder' (which isn’t, on S3) of <span class="inline-code">t_foo_fs</span>, and then the actual data for Flink to upload (<span class="inline-code">part-5d498f53-ec06-4ee4-9fe2-5c7763755200-0-0</span>).

So we’re now at the part of the S3 process where it wants to write the data. We kinda know it’s going to fail anyway because we didn’t configure the endpoint; but we also didn’t configure any credentials and it’s going to be that which trips things up first:

DEBUG org.apache.hadoop.fs.s3a.AWSCredentialProviderList           [] - No credentials from org.apache.flink.fs.s3.common.token.DynamicTemporaryAWSCredentialsProvider@1376bee: org.apache.hadoop.fs.s3a.auth.NoAwsCredentialsException: Dynamic session credentials for Flink: No AWS Credentials
DEBUG org.apache.hadoop.fs.s3a.Invoker                             [] - Starting: create credentials
DEBUG org.apache.hadoop.fs.s3a.Invoker                             [] - create credentials: duration 0:00.001s
DEBUG org.apache.hadoop.fs.s3a.AWSCredentialProviderList           [] - No credentials from TemporaryAWSCredentialsProvider: org.apache.hadoop.fs.s3a.auth.NoAwsCredentialsException: Session credentials in Hadoop configuration: No AWS Credentials
DEBUG org.apache.hadoop.fs.s3a.AWSCredentialProviderList           [] - No credentials from SimpleAWSCredentialsProvider: org.apache.hadoop.fs.s3a.auth.NoAwsCredentialsException: SimpleAWSCredentialsProvider: No AWS credentials in the Hadoop configuration
DEBUG org.apache.hadoop.fs.s3a.AWSCredentialProviderList           [] - No credentials provided by EnvironmentVariableCredentialsProvider: com.amazonaws.SdkClientException: Unable to load AWS credentials from environment variables (AWS_ACCESS_KEY_ID (or AWS_ACCESS_KEY) and AWS_SECRET_KEY (or AWS_SECRET_ACCESS_KEY))
com.amazonaws.SdkClientException: Unable to load AWS credentials from environment variables (AWS_ACCESS_KEY_ID (or AWS_ACCESS_KEY) and AWS_SECRET_KEY (or AWS_SECRET_ACCESS_KEY))
    at com.amazonaws.auth.EnvironmentVariableCredentialsProvider.getCredentials(EnvironmentVariableCredentialsProvider.java:49) ~[?:?]
[…]
DEBUG org.apache.hadoop.fs.s3a.AWSCredentialProviderList           [] - No credentials from org.apache.hadoop.fs.s3a.auth.IAMInstanceCredentialsProvider@4f2b477b: org.apache.hadoop.fs.s3a.auth.NoAwsCredentialsException: IAMInstanceCredentialsProvider: Failed to connect to service endpoint:

What’s useful here is you can see the code go through the different credential source options, including environment variables (<span class="inline-code">EnvironmentVariableCredentialsProvider</span>) and config file (<span class="inline-code">SimpleAWSCredentialsProvider</span>).

With the credentials unavailable, the Flink job then fails:

WARN  org.apache.flink.runtime.taskmanager.Task                    [] - Source: Values[1] -> StreamingFileWriter -> Sink: end (1/1)#0 (25de23919c70373c90645ab5b7bb1b8a_cbc357ccb763df2852fee8c4fc7d55f2_0_0) switched from RUNNING to FAILED with failure cause:
java.nio.file.AccessDeniedException: t_foo_fs/part-5d498f53-ec06-4ee4-9fe2-5c7763755200-0-0:
org.apache.hadoop.fs.s3a.auth.NoAuthWithAWSException:
No AWS Credentials provided by DynamicTemporaryAWSCredentialsProvider TemporaryAWSCredentialsProvider SimpleAWSCredentialsProvider EnvironmentVariableCredentialsProvider IAMInstanceCredentialsProvider :
com.amazonaws.SdkClientException: Unable to load AWS credentials from environment variables (AWS_ACCESS_KEY_ID (or AWS_ACCESS_KEY) and AWS_SECRET_KEY (or AWS_SECRET_ACCESS_KEY))
[…]

Credentials and Configuration

As we saw from the log above, without any configuration for S3 provided the job fails. So let’s rectify that and tell Flink how to authorise to S3 (MinIO). Per the docs, this is done as part of the Flink config file:

/conf/flink-conf.yaml

[…]
s3.access.key: admin
s3.secret.key: password

Now we see from the log file that these credentials are picked up by <span class="inline-code">SimpleAWSCredentialsProvider</span>:

DEBUG org.apache.hadoop.fs.s3a.AWSCredentialProviderList           [] - No credentials from TemporaryAWSCredentialsProvider: org.apache.hadoop.fs.s3a.auth.NoAwsCredentialsException: Session credentials in Hadoop configuration: No AWS Credentials
DEBUG org.apache.hadoop.fs.s3a.AWSCredentialProviderList           [] - Using credentials from SimpleAWSCredentialsProvider

What happens next is interesting. We see the actual call from the AWS library to S3 itself:

DEBUG com.amazonaws.request                                        [] - Sending Request: HEAD https://warehouse.s3.amazonaws.com / Headers: (amz-sdk-invocation-id: e887a1da-8ab6-26aa-20a7-7626c5e75a18, Content-Type: application/octet-stream, User-Agent: Hadoop 3.3.4, aws-sdk-java/1.12.319 Mac_OS_X/14.5 OpenJDK_64-Bit_Server_VM/11.0.21+9 java/11.0.21 vendor/Eclipse_Adoptium cfg/retry-mode/legacy, )

Note the hostname there, <span class="inline-code">HEAD https://warehouse.s3.amazonaws.com</span>. This is actually going out to S3 itself. Which since we’re using the credentials for MinIO, isn’t going to work.

The <span class="inline-code">HEAD</span> fails with HTTP 400 error:

DEBUG com.amazonaws.request                                        [] - Received error response: com.amazonaws.services.s3.model.AmazonS3Exception: Bad Request (Service: Amazon S3; Status Code: 400; Error Code: 400 Bad Request; Request ID: NZT8FG3S4ETHKT83; S3 Extended Request ID: AnoFUPCnG4gL1ve8Gly+aaP3tTGQ8tVmSN+TT57AIX/dAvw71KSUsOg2n+eh6NvI7etIoHmZ80M=; Proxy: null), S3 Extended Request ID: AnoFUPCnG4gL1ve8Gly+aaP3tTGQ8tVmSN+TT57AIX/dAvw71KSUsOg2n+eh6NvI7etIoHmZ80M=

There’s then a second HTTP request (a <span class="inline-code">POST</span>) for the file itself:

DEBUG com.amazonaws.request                                        [] - Sending Request: POST https://warehouse.s3.eu-central-1.amazonaws.com /t_foo_fs/part-12abc4d6-5b99-4627-b27a-d14788c03e36-0-0 Parameters: ({"uploads":[null]}Headers: (amz-sdk-invocation-id: f8a0359c-5115-a716-ff73-76482046b4e2, Content-Length: 0, Content-Type: application/octet-stream, User-Agent: Hadoop 3.3.4, aws-sdk-java/1.12.319 Mac_OS_X/14.5 OpenJDK_64-Bit_Server_VM/11.0.21+9 java/11.0.21 vendor/Eclipse_Adoptium cfg/retry-mode/legacy, )

This also fails, and fatally so this time:

DEBUG com.amazonaws.request                                        [] - Received error response: com.amazonaws.services.s3.model.AmazonS3Exception: The AWS Access Key Id you provided does not exist in our records. (Service: Amazon S3; Status Code: 403; Error Code: InvalidAccessKeyId; Request ID: NZTDTWG94GJKBSGX; S3 Extended Request ID: 7/fnAkRXUg+LiUUzlN9ydkLRuK4Mp/KNjvho4hvQFq9AQYDhwXrGKsEJ8c1yXKmNu+nb8jsfgaQ=; Proxy: null), S3 Extended Request ID: 7/fnAkRXUg+LiUUzlN9ydkLRuK4Mp/KNjvho4hvQFq9AQYDhwXrGKsEJ8c1yXKmNu+nb8jsfgaQ=

This gets floated up to Flink, which terminates the job with a failure:

WARN  org.apache.flink.runtime.taskmanager.Task                    [] - Source: Values[1] -> StreamingFileWriter -> Sink: end (1/1)#0 (64dd133316241806e123b88524963eb3_cbc357ccb763df2852fee8c4fc7d55f2_0_0) switched from RUNNING to FAILED with failure cause:
java.nio.file.AccessDeniedException: t_foo_fs/part-12abc4d6-5b99-4627-b27a-d14788c03e36-0-0: initiate MultiPartUpload on t_foo_fs/part-12abc4d6-5b99-4627-b27a-d14788c03e36-0-0:
com.amazonaws.services.s3.model.AmazonS3Exception: The AWS Access Key Id you provided does not exist in our records.
[…]

Configuring an S3 endpoint for MinIO

If you’re using an S3-compatible object store, such as MinIO, you need to tell the Flink S3 client where to find it, since as we saw above it defaults to literally <span class="inline-code">warehouse.s3.amazonaws.com</span>.

Configuring the endpoint is covered clearly in the docs—add it to your Flink config:

s3.endpoint: http://localhost:9000

After restarting, we see the endpoint reflected in the <span class="inline-code">DEBUG</span> messages as the S3 client starts up and parses its config:

DEBUG org.apache.hadoop.fs.s3a.DefaultS3ClientFactory              [] - Creating endpoint configuration for "http://localhost:9000"
DEBUG org.apache.hadoop.fs.s3a.DefaultS3ClientFactory              [] - Endpoint URI = http://localhost:9000
DEBUG org.apache.hadoop.fs.s3a.DefaultS3ClientFactory              [] - Endpoint http://localhost:9000 is not the default; parsing
DEBUG org.apache.hadoop.fs.s3a.DefaultS3ClientFactory              [] - Region for endpoint http://localhost:9000, URI http://localhost:9000 is determined as null

One thing that I will point out here is what’s shown here in the logs a bit above these endpoint messages:

DEBUG org.apache.flink.fs.s3hadoop.common.HadoopConfigLoader       [] - Adding Flink config entry for s3.endpoint as fs.s3a.endpoint to Hadoop config
DEBUG org.apache.flink.fs.s3hadoop.common.HadoopConfigLoader       [] - Adding Flink config entry for s3.access.key as fs.s3a.access.key to Hadoop config
DEBUG org.apache.flink.fs.s3hadoop.common.HadoopConfigLoader       [] - Adding Flink config entry for s3.secret.key as fs.s3a.secret.key to Hadoop config

This has been a big source of confusion for me. Is it <span class="inline-code">s3.endpoint</span> or <span class="inline-code">fs.s3a.endpoint</span>? The answer is yes! It’s both! For Flink, you configure <span class="inline-code">s3.</span> which then gets mapped internally to the <span class="inline-code">fs.s3a.</span> configuration that the Hadoop-AWS module refers to in its documentation.

So, with the endpoint set, let’s see what happens. We’ll pick up where it went wrong last time; the HTTP calls to the S3 endpoint which should now be correct:

DEBUG com.amazonaws.request                                        [] - Sending Request: POST http://warehouse.localhost:9000 /t_foo_fs/part-b933eb6c-5cc4-4a25-bd33-f314268d7f8c-0-0 Parameters: ({"uploads":[null]}Headers: (amz-sdk-invocation-id: be591818-8fad-1264-ed5b-9dd97dedb041, Content-Length: 0, Content-Type: application/octet-stream, User-Agent: Hadoop 3.3.4, aws-sdk-java/1.12.319 Mac_OS_X/14.5 OpenJDK_64-Bit_Server_VM/11.0.21+9 java/11.0.21 vendor/Eclipse_Adoptium cfg/retry-mode/legacy, )

(Interestingly, no HEAD request first this time like there was before.)

However, this fails, and if you look at the hostname, you’ll see why:

TRACE com.amazonaws.http.AmazonHttpClient                          [] - Unable to execute HTTP request: warehouse.localhost: nodename nor servname provided, or not known Request will be retried.

Somehow it’s getting <span class="inline-code">warehouse.localhost</span> from our configuration, which is not a hostname that exists from my machine. This causes the Flink job to fail (after multiple retries):

WARN  org.apache.flink.runtime.taskmanager.Task                    [] - Source: Values[1] -> StreamingFileWriter -> Sink: end (1/1)#0 (de4000cf76864688506c514ebba58514_cbc357ccb763df2852fee8c4fc7d55f2_0_0) switched from RUNNING to FAILED with failure cause:
org.apache.hadoop.fs.s3a.AWSClientIOException:
initiate MultiPartUpload on t_foo_fs/part-b933eb6c-5cc4-4a25-bd33-f314268d7f8c-0-0:
com.amazonaws.SdkClientException: Unable to execute HTTP request: warehouse.localhost: nodename nor servname provided, or not known:
[…]

This problem comes about because the default option in the S3 client is to use virtual-hosted-style requests in which the bucket name (<span class="inline-code">warehouse</span>, in our example) is prefixed to the endpoint hostname (<span class="inline-code">localhost</span>).

Configuring path-style access for MinIO from Flink S3

Also covered very clearly in the Flink S3 docs is how to configure it to use path-style requests. To the Flink configuration we add:

s3.endpoint: http://localhost:9000

And so, to recap, our <span class="inline-code">flink-conf.yaml</span> for S3 now looks like this:

[…]
s3.access.key: admin
s3.secret.key: password
s3.endpoint: http://localhost:9000
s3.path.style.access: true

After restarting, things look pretty good. The config is being read and passed to Hadoop-AWS:

DEBUG org.apache.flink.fs.s3hadoop.common.HadoopConfigLoader       [] - Adding Flink config entry for s3.endpoint as fs.s3a.endpoint to Hadoop config
DEBUG org.apache.flink.fs.s3hadoop.common.HadoopConfigLoader       [] - Adding Flink config entry for s3.access.key as fs.s3a.access.key to Hadoop config
DEBUG org.apache.flink.fs.s3hadoop.common.HadoopConfigLoader       [] - Adding Flink config entry for s3.secret.key as fs.s3a.secret.key to Hadoop config
DEBUG org.apache.flink.fs.s3hadoop.common.HadoopConfigLoader       [] - Adding Flink config entry for s3.path.style.access as fs.s3a.path.style.access to Hadoop config

The <span class="inline-code">POST</span> call is made to the correct MinIO endpoint, which returns an HTTP 200 successful status code:

DEBUG com.amazonaws.request                                        [] - Sending Request: POST http://localhost:9000 /warehouse/t_foo_fs/part-5caebe06-17eb-405c-bfc7-3f78f7e109da-0-0 Parameters: ({"uploads":[null]}Headers: (amz-sdk-invocation-id: 68d62b6f-997b-ef70-c2ca-3bce040dee2d, Content-Length: 0, Content-Type: application/octet-stream, User-Agent: Hadoop 3.3.4, aws-sdk-java/1.12.319 Mac_OS_X/14.5 OpenJDK_64-Bit_Server_VM/11.0.21+9 java/11.0.21 vendor/Eclipse_Adoptium cfg/retry-mode/legacy, )
[…]
DEBUG com.amazonaws.request                                        [] - Received successful response: 200, AWS Request ID: 17E092B83B43303A

The upload completes successfully:

DEBUG org.apache.hadoop.fs.s3a.Invoker                             [] - Completing multipart upload: duration 0:00.012s
DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem                       [] - Finished write to t_foo_fs/part-5caebe06-17eb-405c-bfc7-3f78f7e109da-0-0, len 19. etag e140dda18b4f195055b066f350b52034-1, version null

We have a (very small) file on MinIO (S3):

$ mc ls --recursive minio
[2024-07-09 14:46:17 UTC]    19B STANDARD warehouse/t_foo_fs/part-5caebe06-17eb-405c-bfc7-3f78f7e109da-0-0

$ mc cat minio/warehouse/t_foo_fs/part-5caebe06-17eb-405c-bfc7-3f78f7e109da-0-0
{"c1":"a","c2":42}

And finally, the Flink job completed successfully:

Flink SQL> show jobs;
+----------------------------------+-------------------------------------------------------+----------+-------------------------+
|                           job id |                                              job name |   status |              start time |
+----------------------------------+-------------------------------------------------------+----------+-------------------------+
| 1b54b5d97a3ec6536cf38fdf7e71d22c | insert-into_default_catalog.default_database.t_foo_fs | FINISHED | 2024-07-09T14:46:16.450 |
+----------------------------------+-------------------------------------------------------+----------+-------------------------+
1 row in set
giphy

Configuration stuff

The Flink S3 docs say to use <span class="inline-code">s3.</span> for configuring S3, and we saw above that these get mapped to <span class="inline-code">fs.s3a.</span> for the Hadoop-AWS module. It’s also valid to specify <span class="inline-code">fs.s3a.</span> directly—they get read and mapped the same:

fs.s3a.access.key: admin
fs.s3a.secret.key: password
fs.s3a.endpoint: http://localhost:9000
fs.s3a.path.style.access: true

shows up in the log thus:

DEBUG org.apache.flink.fs.s3hadoop.common.HadoopConfigLoader       [] - Adding Flink config entry for fs.s3a.access.key as fs.s3a.access.key to Hadoop config
DEBUG org.apache.flink.fs.s3hadoop.common.HadoopConfigLoader       [] - Adding Flink config entry for fs.s3a.secret.key as fs.s3a.secret.key to Hadoop config
DEBUG org.apache.flink.fs.s3hadoop.common.HadoopConfigLoader       [] - Adding Flink config entry for fs.s3a.path.style.access as fs.s3a.path.style.access to Hadoop config
DEBUG org.apache.flink.fs.s3hadoop.common.HadoopConfigLoader       [] - Adding Flink config entry for fs.s3a.endpoint as fs.s3a.endpoint to Hadoop config

If my educated-guess reading of the code is right, here is where the config values are mapped across. The code mentions a <span class="inline-code">flink.hadoop.</span> prefix but this seems to be overridden for <span class="inline-code">flink-s3-fs-hadoop</span> as a set of <span class="inline-code">FLINK_CONFIG_PREFIXES</span> which can be <span class="inline-code">s3., s3a., or fs.s3a.</span>—they’re all the same.

References

There’s got to be an easier way?

There is, and it’s called Decodable 😀

With pre-built connectors for S3, Apache Iceberg, Delta Lake, and dozens more, it’s the easiest way to move data. To use a connector you simply provide the necessary configuration—there’s not a JAR in sight!

Decodable also has managed Flink, so if you really want to write this stuff by hand, you can, and we’ll run it for you.

Sign up for free and give it a try.

📫 Email signup 👇

Did you enjoy this issue of Checkpoint Chronicle? Would you like the next edition delivered directly to your email to read from the comfort of your own home?

Simply enter your email address here and we'll send you the next issue as soon as it's published—and nothing else, we promise!

👍 Got it!
Oops! Something went wrong while submitting the form.
Robin Moffatt

Robin is a Principal DevEx Engineer at Decodable. He has been speaking at conferences since 2009 including QCon, Devoxx, Strata, Kafka Summit, and Øredev. You can find many of his talks online and his articles on the Decodable blog as well as his own blog.

Outside of work, Robin enjoys running, drinking good beer, and eating fried breakfasts—although generally not at the same time.

You’d think once was enough. Having already written about the trouble that I had getting Flink SQL to write to S3 (including on MinIO) this should now be a moot issue for me. Right? RIGHT?!

giphy

Well perhaps it is. I’m not yet sure. But what’s led me down this path again is trying to get the Delta Lake connector to work in Flink SQL, and butting up against S3 problems. In troubleshooting those I ended up back at trying to understand in more detail what a functioning S3 configuration looks like so that I could rule out issues other than the Delta Lake connector.

For Delta Lake and Flink itself you’ll need to stay tuned to the blog, as I write about that in a separate article. Today we’re just looking at some troubleshooting tips for S3 access in Flink SQL in general.

So what we’ve got here is a methodical explanation of how I understand the S3 configuration and libraries to work, keeping things as vanilla as possible to start with. Filesystem connector, JSON format.

Huge caveat: I am just a humble end-user of this stuff. I don’t code Java. I don’t actually know what I’m doing half the time 😉.

Test rig

I’m using Flink 1.18.1, with MinIO providing S3-compatible storage, running locally in a Docker container and available on port 9000.

This is my test statement:

CREATE TABLE t_foo_fs (c1 varchar, c2 int)
     WITH (
      'connector' = 'filesystem',
      'path' = 's3a://warehouse/t_foo_fs/',
      'format' = 'json'
     );

INSERT INTO t_foo_fs VALUES ('a',42);

I’m using json format not because it’s a good idea, but because it doesn’t need any further dependencies installed, and I want to keep this as small and easily-reproduced as possible.

I’m using the default ephemeral in-memory catalog, so each time I restart my session I have to define the table again.

Once the table is created I run the <span class="inline-code">INSERT</span>, which the SQL Client passes as a job to the Job Manager to run (<span class="inline-code">Submitting SQL update statement to the cluster…​</span>). This means that the SQL Client will say <span class="inline-code">successfully</span> in the message after running the <span class="inline-code">INSERT</span> but if you look carefully it’s just saying it’s submitted successfully:

[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 34f3b66a1a6f89bea83145af8289aca5

Thus, you have to then go and check if the job itself actually worked.

To make this reproducible I ran the following:

rm log/*.* &&
  ./bin/start-cluster.sh &&
  ./bin/sql-client.sh &&
  ./bin/stop-cluster.sh &&
  ps -ef|grep java|grep flink|awk '{print $2}'│xargs -Ifoo kill -9 foo &&
  jps

This clears out the log directory, starts Flink and the SQL Client, and then waits for the client to exit. Here I run the <span class="inline-code">CREATE TABLE</span> and <span class="inline-code">INSERT</span>, and then exit the client. This then causes the remainder of the statements to run, which shuts down the Flink cluster, kills any hanging processes, and runs <span class="inline-code">jps</span> to confirm this.

Logging

Getting an insight into what’s going is a two-fold process:

  1. What is running where (across SQL Client, Job Manager, and other components such as catalog metastores)
  2. Logs!

To enable logging on Flink when running it as a local binary started with <span class="inline-code">./start-cluster.sh</span> with SQL Client change the two files:

  1. <span class="inline-code">./conf/log4j.properties</span>
  2. <span class="inline-code">./conf/log4j-cli.properties</span>

To both of these I added: (Ref)

logger.fs.name = org.apache.hadoop.fs
logger.fs.level = TRACE
logger.fs2.name = org.apache.flink.fs
logger.fs2.level = TRACE
logger.aws.name = software.amazon
logger.aws.level = TRACE

Dependency: Hadoop S3 plugin

If you don’t include this and try to write to an <span class="inline-code">s3://</span> path you’ll get this error:

org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 's3'.
The scheme is directly supported by Flink through the following plugin(s): flink-s3-fs-hadoop, flink-s3-fs-presto.
Please ensure that each plugin resides within its own subfolder within the plugins directory. See https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/filesystems/plugins/ for more information.
If you want to use a Hadoop file system for that scheme, please add the scheme to the configuration fs.allowed-fallback-filesystems.
For a full list of supported file systems, please see https://nightlies.apache.org/flink/flink-docs-stable/ops/filesystems/.

For an <span class="inline-code">s3a://</span> path you’ll get basically the same:

org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 's3a'.
The scheme is directly supported by Flink through the following plugin(s): flink-s3-fs-hadoop.
[…]

So per the docs, you need the Hadoop S3 plugin, which ships with Flink but isn’t in place by default.

mkdir ./plugins/s3-fs-hadoop && \
cp ./opt/flink-s3-fs-hadoop-1.18.1.jar ./plugins/s3-fs-hadoop/

Log File Diving

With the log levels set to <span class="inline-code">TRACE</span>, let’s go and have a look at what we get. Here's the output from the <span class="inline-code">taskexecutor</span> log (in the Flink <span class="inline-code">./log</span> folder).

The first entry we have is the S3 filesystem libraries starting. <span class="inline-code">org.apache.flink.fs.s3</span> handles the s3 schema and hands this off to <span class="inline-code">org.apache.hadoop.fs.s3a.S3AFileSystem</span> which does most of the rest of the work.

The Hadoop <span class="inline-code">S3AFileSystem</span> is shown as <span class="inline-code">Initializing</span> for <span class="inline-code">warehouse</span>, where <span class="inline-code">warehouse</span> is the name of the bucket that’s been specified for writing the table data to (<span class="inline-code">'path' = 's3a://warehouse/t_foo_fs/'</span>).

DEBUG org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory    [] - Creating S3 file system backed by Hadoop s3a file system
DEBUG org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory    [] - Loading Hadoop configuration for Hadoop s3a file system
DEBUG org.apache.flink.fs.s3hadoop.S3FileSystemFactory             [] - Using scheme s3a://warehouse/t_foo_fs for s3a file system backing the S3 File System
DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem                       [] - Initializing S3AFileSystem for warehouse

Next up is a bunch of entries covering config values etc. One point to note is <span class="inline-code">Propagating entries under</span> which ties into the idea of per-bucket configuration.

DEBUG org.apache.hadoop.fs.s3a.S3AUtils                            [] - Propagating entries under fs.s3a.bucket.warehouse.
DEBUG org.apache.hadoop.fs.s3a.S3AUtils                            [] - Data is unencrypted
DEBUG org.apache.hadoop.fs.s3a.S3ARetryPolicy                      [] - Retrying on recoverable AWS failures 7 times with an initial interval of 500ms
INFO  org.apache.hadoop.metrics2.impl.MetricsConfig                [] - Loaded properties from hadoop-metrics2.properties
INFO  org.apache.hadoop.metrics2.impl.MetricsSystemImpl            [] - Scheduled Metric snapshot period at 10 second(s).
INFO  org.apache.hadoop.metrics2.impl.MetricsSystemImpl            [] - s3a-file-system metrics system started
DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem                       [] - Client Side Encryption enabled: false
WARN  org.apache.hadoop.util.NativeCodeLoader                      [] - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
DEBUG org.apache.hadoop.fs.s3a.S3ARetryPolicy                      [] - Retrying on recoverable AWS failures 7 times with an initial interval of 500ms
DEBUG org.apache.hadoop.fs.s3a.S3GuardExistsRetryPolicy            [] - Retrying on recoverable S3Guard table/S3 inconsistencies 7 times with an initial interval of 2000ms
DEBUG org.apache.hadoop.fs.s3a.S3AUtils                            [] - Value of fs.s3a.paging.maximum is 5000
DEBUG org.apache.hadoop.fs.s3a.S3AUtils                            [] - Value of fs.s3a.block.size is 33554432
DEBUG org.apache.hadoop.fs.s3a.S3AUtils                            [] - Value of fs.s3a.readahead.range is 65536
DEBUG org.apache.hadoop.fs.s3a.S3AUtils                            [] - Value of fs.s3a.max.total.tasks is 32
DEBUG org.apache.hadoop.fs.s3a.S3AUtils                            [] - Value of fs.s3a.threads.keepalivetime is 60
DEBUG org.apache.hadoop.fs.s3a.S3AUtils                            [] - Value of fs.s3a.executor.capacity is 16
DEBUG org.apache.hadoop.fs.s3a.auth.SignerManager                  [] - No custom signers specified
DEBUG org.apache.hadoop.fs.s3a.audit.AuditIntegration              [] - auditing is disabled
DEBUG org.apache.hadoop.fs.s3a.audit.AuditIntegration              [] - Started Audit Manager Service NoopAuditManagerS3A in state NoopAuditManagerS3A: STARTED
DEBUG org.apache.hadoop.fs.s3a.S3AUtils                            [] - Value of fs.s3a.internal.upload.part.count.limit is 10000
DEBUG org.apache.hadoop.fs.s3a.S3ARetryPolicy                      [] - Retrying on recoverable AWS failures 7 times with an initial interval of 500ms
DEBUG org.apache.hadoop.fs.s3a.S3AUtils                            [] - Credential provider class is org.apache.flink.fs.s3.common.token.DynamicTemporaryAWSCredentialsProvider
DEBUG org.apache.hadoop.fs.s3a.S3AUtils                            [] - Credential provider class is org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider
DEBUG org.apache.hadoop.fs.s3a.S3AUtils                            [] - Credential provider class is org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider
DEBUG org.apache.hadoop.fs.s3a.S3AUtils                            [] - Credential provider class is com.amazonaws.auth.EnvironmentVariableCredentialsProvider
DEBUG org.apache.hadoop.fs.s3a.S3AUtils                            [] - Credential provider class is org.apache.hadoop.fs.s3a.auth.IAMInstanceCredentialsProvider
DEBUG org.apache.hadoop.fs.s3a.S3AUtils                            [] - For URI s3a://warehouse/t_foo_fs, using credentials AWSCredentialProviderList[refcount= 1: [org.apache.flink.fs.s3.common.token.DynamicTemporaryAWSCredentialsProvider@1376bee, TemporaryAWSCredentialsProvider, SimpleAWSCredentialsProvider, EnvironmentVariableCredentialsProvider, org.apache.hadoop.fs.s3a.auth.IAMInstanceCredentialsProvider@4f2b477b]
DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem                       [] - Using credential provider AWSCredentialProviderList[refcount= 1: [org.apache.flink.fs.s3.common.token.DynamicTemporaryAWSCredentialsProvider@1376bee, TemporaryAWSCredentialsProvider, SimpleAWSCredentialsProvider, EnvironmentVariableCredentialsProvider, org.apache.hadoop.fs.s3a.auth.IAMInstanceCredentialsProvider@4f2b477b]
DEBUG org.apache.hadoop.fs.s3a.S3AUtils                            [] - Value of fs.s3a.connection.maximum is 96

Next is another <span class="inline-code">DEBUG</span> entry but with something that looks like an error:

DEBUG org.apache.hadoop.fs.s3a.impl.NetworkBinding                 [] - Unable to create class org.apache.hadoop.fs.s3a.impl.ConfigureShadedAWSSocketFactory, value of fs.s3a.ssl.channel.mode will be ignored
java.lang.NoClassDefFoundError: com/amazonaws/thirdparty/apache/http/conn/socket/ConnectionSocketFactory
    at java.lang.Class.forName0(Native Method) ~[?:?]
    at java.lang.Class.forName(Class.java:315) ~[?:?]
    at org.apache.hadoop.fs.s3a.impl.NetworkBinding.bindSSLChannelMode(NetworkBinding.java:89) ~[flink-s3-fs-hadoop-1.18.1.jar:1.18.1]
    at org.apache.hadoop.fs.s3a.S3AUtils.initProtocolSettings(S3AUtils.java:1347) ~[flink-s3-fs-hadoop-1.18.1.jar:1.18.1]
[…]
Caused by: java.lang.ClassNotFoundException: com.amazonaws.thirdparty.apache.http.conn.socket.ConnectionSocketFactory
[…]

I guess this is actually just some internal stuff, since it’s not raised as an error, so we’ll ignore it for now.

Then some more config values, and then an interesting one (since we’re using MinIO)—the endpoint configuration. Since we’ve not yet set it, it’s unsurprising to see that it’s using the default (<span class="inline-code">Using default endpoint</span>):

DEBUG org.apache.hadoop.fs.s3a.DefaultS3ClientFactory              [] - Creating endpoint configuration for ""
DEBUG org.apache.hadoop.fs.s3a.DefaultS3ClientFactory              [] - Using default endpoint -no need to generate a configuration
DEBUG org.apache.hadoop.fs.s3a.DefaultS3ClientFactory              [] - fs.s3a.endpoint.region="us-east-1"
DEBUG org.apache.hadoop.fs.s3a.DefaultS3ClientFactory              [] - Using default endpoint; setting region to us-east-1

I’ll skip over a bit more of the same kind of background <span class="inline-code">DEBUG</span> stuff, and highlight this bit where we start to see the file paths mentioned.

DEBUG org.apache.hadoop.fs.s3a.WriteOperationHelper                [] - Initiating Multipart upload to t_foo_fs/part-5d498f53-ec06-4ee4-9fe2-5c7763755200-0-0
DEBUG org.apache.hadoop.fs.s3a.Invoker                             [] - Starting: initiate MultiPartUpload
DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem                       [] - Initiate multipart upload to t_foo_fs/part-5d498f53-ec06-4ee4-9fe2-5c7763755200-0-0
DEBUG org.apache.hadoop.fs.statistics.impl.IOStatisticsStoreImpl   [] - Incrementing counter object_multipart_initiated by 1 with final value 1

Remembering the value of <span class="inline-code">path ('path' = 's3a://warehouse/t_foo_fs/')</span> we can see the bucket has been stripped away to give us just the 'folder' (which isn’t, on S3) of <span class="inline-code">t_foo_fs</span>, and then the actual data for Flink to upload (<span class="inline-code">part-5d498f53-ec06-4ee4-9fe2-5c7763755200-0-0</span>).

So we’re now at the part of the S3 process where it wants to write the data. We kinda know it’s going to fail anyway because we didn’t configure the endpoint; but we also didn’t configure any credentials and it’s going to be that which trips things up first:

DEBUG org.apache.hadoop.fs.s3a.AWSCredentialProviderList           [] - No credentials from org.apache.flink.fs.s3.common.token.DynamicTemporaryAWSCredentialsProvider@1376bee: org.apache.hadoop.fs.s3a.auth.NoAwsCredentialsException: Dynamic session credentials for Flink: No AWS Credentials
DEBUG org.apache.hadoop.fs.s3a.Invoker                             [] - Starting: create credentials
DEBUG org.apache.hadoop.fs.s3a.Invoker                             [] - create credentials: duration 0:00.001s
DEBUG org.apache.hadoop.fs.s3a.AWSCredentialProviderList           [] - No credentials from TemporaryAWSCredentialsProvider: org.apache.hadoop.fs.s3a.auth.NoAwsCredentialsException: Session credentials in Hadoop configuration: No AWS Credentials
DEBUG org.apache.hadoop.fs.s3a.AWSCredentialProviderList           [] - No credentials from SimpleAWSCredentialsProvider: org.apache.hadoop.fs.s3a.auth.NoAwsCredentialsException: SimpleAWSCredentialsProvider: No AWS credentials in the Hadoop configuration
DEBUG org.apache.hadoop.fs.s3a.AWSCredentialProviderList           [] - No credentials provided by EnvironmentVariableCredentialsProvider: com.amazonaws.SdkClientException: Unable to load AWS credentials from environment variables (AWS_ACCESS_KEY_ID (or AWS_ACCESS_KEY) and AWS_SECRET_KEY (or AWS_SECRET_ACCESS_KEY))
com.amazonaws.SdkClientException: Unable to load AWS credentials from environment variables (AWS_ACCESS_KEY_ID (or AWS_ACCESS_KEY) and AWS_SECRET_KEY (or AWS_SECRET_ACCESS_KEY))
    at com.amazonaws.auth.EnvironmentVariableCredentialsProvider.getCredentials(EnvironmentVariableCredentialsProvider.java:49) ~[?:?]
[…]
DEBUG org.apache.hadoop.fs.s3a.AWSCredentialProviderList           [] - No credentials from org.apache.hadoop.fs.s3a.auth.IAMInstanceCredentialsProvider@4f2b477b: org.apache.hadoop.fs.s3a.auth.NoAwsCredentialsException: IAMInstanceCredentialsProvider: Failed to connect to service endpoint:

What’s useful here is you can see the code go through the different credential source options, including environment variables (<span class="inline-code">EnvironmentVariableCredentialsProvider</span>) and config file (<span class="inline-code">SimpleAWSCredentialsProvider</span>).

With the credentials unavailable, the Flink job then fails:

WARN  org.apache.flink.runtime.taskmanager.Task                    [] - Source: Values[1] -> StreamingFileWriter -> Sink: end (1/1)#0 (25de23919c70373c90645ab5b7bb1b8a_cbc357ccb763df2852fee8c4fc7d55f2_0_0) switched from RUNNING to FAILED with failure cause:
java.nio.file.AccessDeniedException: t_foo_fs/part-5d498f53-ec06-4ee4-9fe2-5c7763755200-0-0:
org.apache.hadoop.fs.s3a.auth.NoAuthWithAWSException:
No AWS Credentials provided by DynamicTemporaryAWSCredentialsProvider TemporaryAWSCredentialsProvider SimpleAWSCredentialsProvider EnvironmentVariableCredentialsProvider IAMInstanceCredentialsProvider :
com.amazonaws.SdkClientException: Unable to load AWS credentials from environment variables (AWS_ACCESS_KEY_ID (or AWS_ACCESS_KEY) and AWS_SECRET_KEY (or AWS_SECRET_ACCESS_KEY))
[…]

Credentials and Configuration

As we saw from the log above, without any configuration for S3 provided the job fails. So let’s rectify that and tell Flink how to authorise to S3 (MinIO). Per the docs, this is done as part of the Flink config file:

/conf/flink-conf.yaml

[…]
s3.access.key: admin
s3.secret.key: password

Now we see from the log file that these credentials are picked up by <span class="inline-code">SimpleAWSCredentialsProvider</span>:

DEBUG org.apache.hadoop.fs.s3a.AWSCredentialProviderList           [] - No credentials from TemporaryAWSCredentialsProvider: org.apache.hadoop.fs.s3a.auth.NoAwsCredentialsException: Session credentials in Hadoop configuration: No AWS Credentials
DEBUG org.apache.hadoop.fs.s3a.AWSCredentialProviderList           [] - Using credentials from SimpleAWSCredentialsProvider

What happens next is interesting. We see the actual call from the AWS library to S3 itself:

DEBUG com.amazonaws.request                                        [] - Sending Request: HEAD https://warehouse.s3.amazonaws.com / Headers: (amz-sdk-invocation-id: e887a1da-8ab6-26aa-20a7-7626c5e75a18, Content-Type: application/octet-stream, User-Agent: Hadoop 3.3.4, aws-sdk-java/1.12.319 Mac_OS_X/14.5 OpenJDK_64-Bit_Server_VM/11.0.21+9 java/11.0.21 vendor/Eclipse_Adoptium cfg/retry-mode/legacy, )

Note the hostname there, <span class="inline-code">HEAD https://warehouse.s3.amazonaws.com</span>. This is actually going out to S3 itself. Which since we’re using the credentials for MinIO, isn’t going to work.

The <span class="inline-code">HEAD</span> fails with HTTP 400 error:

DEBUG com.amazonaws.request                                        [] - Received error response: com.amazonaws.services.s3.model.AmazonS3Exception: Bad Request (Service: Amazon S3; Status Code: 400; Error Code: 400 Bad Request; Request ID: NZT8FG3S4ETHKT83; S3 Extended Request ID: AnoFUPCnG4gL1ve8Gly+aaP3tTGQ8tVmSN+TT57AIX/dAvw71KSUsOg2n+eh6NvI7etIoHmZ80M=; Proxy: null), S3 Extended Request ID: AnoFUPCnG4gL1ve8Gly+aaP3tTGQ8tVmSN+TT57AIX/dAvw71KSUsOg2n+eh6NvI7etIoHmZ80M=

There’s then a second HTTP request (a <span class="inline-code">POST</span>) for the file itself:

DEBUG com.amazonaws.request                                        [] - Sending Request: POST https://warehouse.s3.eu-central-1.amazonaws.com /t_foo_fs/part-12abc4d6-5b99-4627-b27a-d14788c03e36-0-0 Parameters: ({"uploads":[null]}Headers: (amz-sdk-invocation-id: f8a0359c-5115-a716-ff73-76482046b4e2, Content-Length: 0, Content-Type: application/octet-stream, User-Agent: Hadoop 3.3.4, aws-sdk-java/1.12.319 Mac_OS_X/14.5 OpenJDK_64-Bit_Server_VM/11.0.21+9 java/11.0.21 vendor/Eclipse_Adoptium cfg/retry-mode/legacy, )

This also fails, and fatally so this time:

DEBUG com.amazonaws.request                                        [] - Received error response: com.amazonaws.services.s3.model.AmazonS3Exception: The AWS Access Key Id you provided does not exist in our records. (Service: Amazon S3; Status Code: 403; Error Code: InvalidAccessKeyId; Request ID: NZTDTWG94GJKBSGX; S3 Extended Request ID: 7/fnAkRXUg+LiUUzlN9ydkLRuK4Mp/KNjvho4hvQFq9AQYDhwXrGKsEJ8c1yXKmNu+nb8jsfgaQ=; Proxy: null), S3 Extended Request ID: 7/fnAkRXUg+LiUUzlN9ydkLRuK4Mp/KNjvho4hvQFq9AQYDhwXrGKsEJ8c1yXKmNu+nb8jsfgaQ=

This gets floated up to Flink, which terminates the job with a failure:

WARN  org.apache.flink.runtime.taskmanager.Task                    [] - Source: Values[1] -> StreamingFileWriter -> Sink: end (1/1)#0 (64dd133316241806e123b88524963eb3_cbc357ccb763df2852fee8c4fc7d55f2_0_0) switched from RUNNING to FAILED with failure cause:
java.nio.file.AccessDeniedException: t_foo_fs/part-12abc4d6-5b99-4627-b27a-d14788c03e36-0-0: initiate MultiPartUpload on t_foo_fs/part-12abc4d6-5b99-4627-b27a-d14788c03e36-0-0:
com.amazonaws.services.s3.model.AmazonS3Exception: The AWS Access Key Id you provided does not exist in our records.
[…]

Configuring an S3 endpoint for MinIO

If you’re using an S3-compatible object store, such as MinIO, you need to tell the Flink S3 client where to find it, since as we saw above it defaults to literally <span class="inline-code">warehouse.s3.amazonaws.com</span>.

Configuring the endpoint is covered clearly in the docs—add it to your Flink config:

s3.endpoint: http://localhost:9000

After restarting, we see the endpoint reflected in the <span class="inline-code">DEBUG</span> messages as the S3 client starts up and parses its config:

DEBUG org.apache.hadoop.fs.s3a.DefaultS3ClientFactory              [] - Creating endpoint configuration for "http://localhost:9000"
DEBUG org.apache.hadoop.fs.s3a.DefaultS3ClientFactory              [] - Endpoint URI = http://localhost:9000
DEBUG org.apache.hadoop.fs.s3a.DefaultS3ClientFactory              [] - Endpoint http://localhost:9000 is not the default; parsing
DEBUG org.apache.hadoop.fs.s3a.DefaultS3ClientFactory              [] - Region for endpoint http://localhost:9000, URI http://localhost:9000 is determined as null

One thing that I will point out here is what’s shown here in the logs a bit above these endpoint messages:

DEBUG org.apache.flink.fs.s3hadoop.common.HadoopConfigLoader       [] - Adding Flink config entry for s3.endpoint as fs.s3a.endpoint to Hadoop config
DEBUG org.apache.flink.fs.s3hadoop.common.HadoopConfigLoader       [] - Adding Flink config entry for s3.access.key as fs.s3a.access.key to Hadoop config
DEBUG org.apache.flink.fs.s3hadoop.common.HadoopConfigLoader       [] - Adding Flink config entry for s3.secret.key as fs.s3a.secret.key to Hadoop config

This has been a big source of confusion for me. Is it <span class="inline-code">s3.endpoint</span> or <span class="inline-code">fs.s3a.endpoint</span>? The answer is yes! It’s both! For Flink, you configure <span class="inline-code">s3.</span> which then gets mapped internally to the <span class="inline-code">fs.s3a.</span> configuration that the Hadoop-AWS module refers to in its documentation.

So, with the endpoint set, let’s see what happens. We’ll pick up where it went wrong last time; the HTTP calls to the S3 endpoint which should now be correct:

DEBUG com.amazonaws.request                                        [] - Sending Request: POST http://warehouse.localhost:9000 /t_foo_fs/part-b933eb6c-5cc4-4a25-bd33-f314268d7f8c-0-0 Parameters: ({"uploads":[null]}Headers: (amz-sdk-invocation-id: be591818-8fad-1264-ed5b-9dd97dedb041, Content-Length: 0, Content-Type: application/octet-stream, User-Agent: Hadoop 3.3.4, aws-sdk-java/1.12.319 Mac_OS_X/14.5 OpenJDK_64-Bit_Server_VM/11.0.21+9 java/11.0.21 vendor/Eclipse_Adoptium cfg/retry-mode/legacy, )

(Interestingly, no HEAD request first this time like there was before.)

However, this fails, and if you look at the hostname, you’ll see why:

TRACE com.amazonaws.http.AmazonHttpClient                          [] - Unable to execute HTTP request: warehouse.localhost: nodename nor servname provided, or not known Request will be retried.

Somehow it’s getting <span class="inline-code">warehouse.localhost</span> from our configuration, which is not a hostname that exists from my machine. This causes the Flink job to fail (after multiple retries):

WARN  org.apache.flink.runtime.taskmanager.Task                    [] - Source: Values[1] -> StreamingFileWriter -> Sink: end (1/1)#0 (de4000cf76864688506c514ebba58514_cbc357ccb763df2852fee8c4fc7d55f2_0_0) switched from RUNNING to FAILED with failure cause:
org.apache.hadoop.fs.s3a.AWSClientIOException:
initiate MultiPartUpload on t_foo_fs/part-b933eb6c-5cc4-4a25-bd33-f314268d7f8c-0-0:
com.amazonaws.SdkClientException: Unable to execute HTTP request: warehouse.localhost: nodename nor servname provided, or not known:
[…]

This problem comes about because the default option in the S3 client is to use virtual-hosted-style requests in which the bucket name (<span class="inline-code">warehouse</span>, in our example) is prefixed to the endpoint hostname (<span class="inline-code">localhost</span>).

Configuring path-style access for MinIO from Flink S3

Also covered very clearly in the Flink S3 docs is how to configure it to use path-style requests. To the Flink configuration we add:

s3.endpoint: http://localhost:9000

And so, to recap, our <span class="inline-code">flink-conf.yaml</span> for S3 now looks like this:

[…]
s3.access.key: admin
s3.secret.key: password
s3.endpoint: http://localhost:9000
s3.path.style.access: true

After restarting, things look pretty good. The config is being read and passed to Hadoop-AWS:

DEBUG org.apache.flink.fs.s3hadoop.common.HadoopConfigLoader       [] - Adding Flink config entry for s3.endpoint as fs.s3a.endpoint to Hadoop config
DEBUG org.apache.flink.fs.s3hadoop.common.HadoopConfigLoader       [] - Adding Flink config entry for s3.access.key as fs.s3a.access.key to Hadoop config
DEBUG org.apache.flink.fs.s3hadoop.common.HadoopConfigLoader       [] - Adding Flink config entry for s3.secret.key as fs.s3a.secret.key to Hadoop config
DEBUG org.apache.flink.fs.s3hadoop.common.HadoopConfigLoader       [] - Adding Flink config entry for s3.path.style.access as fs.s3a.path.style.access to Hadoop config

The <span class="inline-code">POST</span> call is made to the correct MinIO endpoint, which returns an HTTP 200 successful status code:

DEBUG com.amazonaws.request                                        [] - Sending Request: POST http://localhost:9000 /warehouse/t_foo_fs/part-5caebe06-17eb-405c-bfc7-3f78f7e109da-0-0 Parameters: ({"uploads":[null]}Headers: (amz-sdk-invocation-id: 68d62b6f-997b-ef70-c2ca-3bce040dee2d, Content-Length: 0, Content-Type: application/octet-stream, User-Agent: Hadoop 3.3.4, aws-sdk-java/1.12.319 Mac_OS_X/14.5 OpenJDK_64-Bit_Server_VM/11.0.21+9 java/11.0.21 vendor/Eclipse_Adoptium cfg/retry-mode/legacy, )
[…]
DEBUG com.amazonaws.request                                        [] - Received successful response: 200, AWS Request ID: 17E092B83B43303A

The upload completes successfully:

DEBUG org.apache.hadoop.fs.s3a.Invoker                             [] - Completing multipart upload: duration 0:00.012s
DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem                       [] - Finished write to t_foo_fs/part-5caebe06-17eb-405c-bfc7-3f78f7e109da-0-0, len 19. etag e140dda18b4f195055b066f350b52034-1, version null

We have a (very small) file on MinIO (S3):

$ mc ls --recursive minio
[2024-07-09 14:46:17 UTC]    19B STANDARD warehouse/t_foo_fs/part-5caebe06-17eb-405c-bfc7-3f78f7e109da-0-0

$ mc cat minio/warehouse/t_foo_fs/part-5caebe06-17eb-405c-bfc7-3f78f7e109da-0-0
{"c1":"a","c2":42}

And finally, the Flink job completed successfully:

Flink SQL> show jobs;
+----------------------------------+-------------------------------------------------------+----------+-------------------------+
|                           job id |                                              job name |   status |              start time |
+----------------------------------+-------------------------------------------------------+----------+-------------------------+
| 1b54b5d97a3ec6536cf38fdf7e71d22c | insert-into_default_catalog.default_database.t_foo_fs | FINISHED | 2024-07-09T14:46:16.450 |
+----------------------------------+-------------------------------------------------------+----------+-------------------------+
1 row in set
giphy

Configuration stuff

The Flink S3 docs say to use <span class="inline-code">s3.</span> for configuring S3, and we saw above that these get mapped to <span class="inline-code">fs.s3a.</span> for the Hadoop-AWS module. It’s also valid to specify <span class="inline-code">fs.s3a.</span> directly—they get read and mapped the same:

fs.s3a.access.key: admin
fs.s3a.secret.key: password
fs.s3a.endpoint: http://localhost:9000
fs.s3a.path.style.access: true

shows up in the log thus:

DEBUG org.apache.flink.fs.s3hadoop.common.HadoopConfigLoader       [] - Adding Flink config entry for fs.s3a.access.key as fs.s3a.access.key to Hadoop config
DEBUG org.apache.flink.fs.s3hadoop.common.HadoopConfigLoader       [] - Adding Flink config entry for fs.s3a.secret.key as fs.s3a.secret.key to Hadoop config
DEBUG org.apache.flink.fs.s3hadoop.common.HadoopConfigLoader       [] - Adding Flink config entry for fs.s3a.path.style.access as fs.s3a.path.style.access to Hadoop config
DEBUG org.apache.flink.fs.s3hadoop.common.HadoopConfigLoader       [] - Adding Flink config entry for fs.s3a.endpoint as fs.s3a.endpoint to Hadoop config

If my educated-guess reading of the code is right, here is where the config values are mapped across. The code mentions a <span class="inline-code">flink.hadoop.</span> prefix but this seems to be overridden for <span class="inline-code">flink-s3-fs-hadoop</span> as a set of <span class="inline-code">FLINK_CONFIG_PREFIXES</span> which can be <span class="inline-code">s3., s3a., or fs.s3a.</span>—they’re all the same.

References

There’s got to be an easier way?

There is, and it’s called Decodable 😀

With pre-built connectors for S3, Apache Iceberg, Delta Lake, and dozens more, it’s the easiest way to move data. To use a connector you simply provide the necessary configuration—there’s not a JAR in sight!

Decodable also has managed Flink, so if you really want to write this stuff by hand, you can, and we’ll run it for you.

Sign up for free and give it a try.

📫 Email signup 👇

Did you enjoy this issue of Checkpoint Chronicle? Would you like the next edition delivered directly to your email to read from the comfort of your own home?

Simply enter your email address here and we'll send you the next issue as soon as it's published—and nothing else, we promise!

Robin Moffatt

Robin is a Principal DevEx Engineer at Decodable. He has been speaking at conferences since 2009 including QCon, Devoxx, Strata, Kafka Summit, and Øredev. You can find many of his talks online and his articles on the Decodable blog as well as his own blog.

Outside of work, Robin enjoys running, drinking good beer, and eating fried breakfasts—although generally not at the same time.