Back
March 12, 2024
5
min read

Exploring the Flink SQL Gateway REST API

By
Robin Moffatt
Share this post

The SQL Gateway in Apache Flink provides a way to run SQL in Flink from places other than the SQL Client. This includes using a JDBC Driver (which opens up a multitude of clients), a Hive client via the HiveServer2 endpoint, and directly against the REST Endpoint.

As I continue my journey with learning Flink SQL one thing I wondered was how one would go about submitting a Flink SQL job in a Production scenario. It seems to me that the SQL Gateway's REST API would be a good candidate for this. You'd put the SQL code in a file under source control, and then use a deployment pipeline to submit that SQL against the endpoint. It’s worth noting that it’s recommended to use application mode when deploying production jobs to Flink—and the SQL Client and Gateway don’t support this yet. There is a FLIP under discussion but in the meantime if you want to use application mode you’d need to wrap your SQL in a JAR to deploy it. Either that, or continue to use SQL Client or Gateway and be aware of the limitations of running the job in session mode (basically, no resource isolation).

In this article I'll show you how to use the endpoint, including exploring it with the Postman tool, using HTTPie to call the endpoint from the shell, and finishing off with a viable proof-of-concept script to execute statements from a script.

The API is documented and has two versions, each with their own OpenAPI YAML spec. I'm going to look at <span class="inline-code">v2</span> here. Note that the docs say that the spec is still experimental.

To get started, let's bring up the Flink cluster and the SQL Gateway locally:

./bin/start-cluster.sh
./bin/sql-gateway.sh start -Dsql-gateway.endpoint.rest.address=localhost

Here Comes the Postman

Postman is a handy tool for doing tons of useful stuff with APIs. Here I'm just using it to create the sample calls against the endpoints and to quickly understand how they relate to each other. You can use it on the web but assuming you're using a local instance of Flink (or at least, one that is not publicly accessible) then you'll want the desktop version. Note that you'll still need to sign up for a free account to access the Import feature that we're using here.

Under your Workspace click Import and paste in the URL of the SQL Gateway's OpenAPI YAML file (<span class="inline-code">https://nightlies.apache.org/flink/flink-docs-master/generated/rest_v2_sql_gateway.yml</span>) which is linked to on the docs page. Import it as a Postman Collection.

CleanShot 2024-03-07 at 17.21.51 1.png

You'll now see under the Postman collection a list of all the API endpoints, with pre-created calls for each. Go to Environments -> Globals and define <span class="inline-code">baseUrl</span> with the value for your SQL Gateway. If you're running it locally then this is going to be <span class="inline-code">http://localhost:8083</span>

CleanShot 2024-03-07 at 17.27.59.png

Now go back to Collections and under the Flink SQL Gateway REST API folder find the <span class="inline-code">get Info</span> call. Open this up and hit Send. You should see a successful response like this:

CleanShot 2024-03-07 at 17.32.59.png

You can also click the Code icon (<span class="inline-code">&lt;/&gt;</span> ) to see the call in various different languages and tools including cURL and HTTPie. For now this is not ground-breaking, but once you get onto payloads it's really handy.

CleanShot 2024-03-07 at 17.34.00.png

Just as we manually populated the global variable <span class="inline-code">baseURL</span> above, we can also take the response from one call and use it in the making of another. This is really useful, because there are two variables that the REST API returns that we need to use (<span class="inline-code">sessionHandle</span> and <span class="inline-code">operationHandle</span>). To do this in Postman add the following to the Tests tab of the request pane:

var jsonData = JSON.parse(responseBody);
postman.setEnvironmentVariable("sessionHandle", jsonData.sessionHandle);

That assumes that the variable to populate is called <span class="inline-code">sessionHandle</span> and it's returned in the root key called <span class="inline-code">sessionHandle</span> of the response. Which it is:

$ http  --follow --timeout 3600 POST 'localhost:8083/sessions' \
 Content-Type:'application/json' \
 Accept:'application/json'
HTTP/1.1 200 OK
access-control-allow-origin: *
connection: keep-alive
content-length: 56
content-type: application/json; charset=UTF-8

{
    "sessionHandle": "190edef5-df00-4182-be0e-431737b1e93b"
}

Once you've set a variable, you can use it in other calls by referencing it in double curly braces, like this:

CleanShot 2024-03-07 at 17.46.23.png

I’ve shared a copy of my Postman collection with the above variable configuration done for you here

Let's now go through the workflow of how we'd actually submit a SQL statement from scratch to the gateway.

Running a SQL Statement with the Flink SQL Gateway

In essence, the minimal steps are as follows. You can see the docs for more info.

  1. Establish a Session (with optional configuration parameters set)
  2. Submit a SQL Statement, which generates an Operation.
  3. Check the status of the Operation until it's complete
  4. Fetch the results of the Operation.

Here's how to do each one, using HTTPie as an example client and showing the response. I'm using bash variables to hold the values of session and operation handles.

0. Check the connection and Flink version

$ http --body --follow --timeout 3600 GET 'localhost:8083/info' \
 Accept:'application/json'
{
    "productName": "Apache Flink",
    "version": "1.18.1"
}

1. Create a session

POST /session

$ printf '{
  "properties": {
    "execution.runtime-mode": "batch"
  }
}'| http  --follow --timeout 3600 POST 'localhost:8083/sessions' \
 Content-Type:'application/json' \
 Accept:'application/json'

HTTP/1.1 200 OK
access-control-allow-origin: *
connection: keep-alive
content-length: 56
content-type: application/json; charset=UTF-8

{
    "sessionHandle": "e296eb18-9b6e-4fbc-bd6c-0cbb93a7fe28"
}

$ export SESSIONHANDLE="e296eb18-9b6e-4fbc-bd6c-0cbb93a7fe28"

[Optional] Validate session and read session config

GET /sessions/:session_handle

Note here the <span class="inline-code">runtime-mode</span> has been set from the <span class="inline-code">properties</span> that were passed above in the session creation.

$ http --ignore-stdin --form --follow --timeout 3600 GET 'localhost:8083/sessions/'$SESSIONHANDLE \
 Accept:'application/json'
HTTP/1.1 200 OK
access-control-allow-origin: *
connection: keep-alive
content-length: 2129
content-type: application/json; charset=UTF-8
{
    "properties": {
        "env.java.opts.all": "--add-exports=java.base/sun.net.util=ALL-UNNAMED --add-exports=java.rmi/sun.rmi.registry=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.api=ALL-UNN
AMED --add-exports=jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.parser=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.tree=AL
L-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.util=ALL-UNNAMED --add-exports=java.security.jgss/sun.security.krb5=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED --add-op
ens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/ja
va.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.text=ALL-UNNAMED --add-opens=java.base/java.time=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.ut
il.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.locks=ALL-UNNAMED",
        "execution.attached": "true",
        "execution.runtime-mode": "batch",
        "execution.savepoint-restore-mode": "NO_CLAIM",
        "execution.savepoint.ignore-unclaimed-state": "false",
        "execution.shutdown-on-attached-exit": "false",
        "execution.target": "remote",
        "jobmanager.bind-host": "localhost",
        "jobmanager.execution.failover-strategy": "region",
        "jobmanager.memory.process.size": "1600m",
        "jobmanager.rpc.address": "localhost",
        "jobmanager.rpc.port": "6123",
        "parallelism.default": "1",
        "pipeline.classpaths": "",
        "pipeline.jars": "file:/Users/rmoff/flink/flink-1.18.1/opt/flink-python-1.18.1.jar",
        "rest.address": "localhost",
        "rest.bind-address": "localhost",
        "sql-gateway.endpoint.rest.address": "localhost",
        "table.catalog-store.file.path": "./conf/catalogs",
        "table.catalog-store.kind": "file",
        "table.resources.download-dir": "/var/folders/7x/nscwrz557vlcd_ydgt7d5wt00000gn/T/sql-gateway-e296eb18-9b6e-4fbc-bd6c-0cbb93a7fe28",
        "taskmanager.bind-host": "localhost",
        "taskmanager.host": "localhost",
        "taskmanager.memory.process.size": "1728m",
        "taskmanager.numberOfTaskSlots": "1"
    }
}

2. Submit a SQL statement

POST /sessions/:session_handle/statements

$ printf '{
  "statement": "CREATE  TABLE t_foo  WITH ( '\''connector'\'' = '\''filesystem'\'', '\''path'\'' = '\''file:///tmp/flink-test'\'', '\''format'\'' = '\''csv'\'', '\''csv.field-delimiter'\'' = '\'','\'' ) AS SELECT name, COUNT(*) AS cnt FROM (VALUES ('\''Bob'\''), ('\''Alice'\''), ('\''Greg'\''), ('\''Bob'\'')) AS NameTable(name) GROUP BY name;"
}'| http  --follow --timeout 3600 POST 'localhost:8083/sessions/'$SESSIONHANDLE'/statements' \
 Content-Type:'application/json' \
 Accept:'application/json'
HTTP/1.1 200 OK
access-control-allow-origin: *
connection: keep-alive
content-length: 58
content-type: application/json; charset=UTF-8

{
    "operationHandle": "ba45649c-07b2-4b1c-a190-df3631b53549"
}

$ export OPERATIONHANDLE="ba45649c-07b2-4b1c-a190-df3631b53549"

3. Get Statement Execution Status

GET /sessions/:session_handle/operations/:operation_handle/status

$ http --follow --timeout 3600 GET 'localhost:8083/sessions/'$SESSIONHANDLE'/operations/'$OPERATIONHANDLE'/status' \
 Accept:'application/json'
HTTP/1.1 200 OK
access-control-allow-origin: *
connection: keep-alive
content-length: 21
content-type: application/json; charset=UTF-8

{
    "status": "FINISHED"
}

4. Get Results

GET /sessions/:session_handle/operations/:operation_handle/result/:token

$ http --follow --timeout 3600 GET 'localhost:8083/sessions/'$SESSIONHANDLE'/operations/'$OPERATIONHANDLE'/result/0?rowFormat=JSON' \
 Accept:'application/json'

HTTP/1.1 200 OK
access-control-allow-origin: *
connection: keep-alive
content-length: 483
content-type: application/json; charset=UTF-8
{
  "resultType": "PAYLOAD",
  "isQueryResult": false,
  "jobID": "fb1a5f06643364bc82a9a4e0bd3e9c10",
  "resultKind": "SUCCESS_WITH_CONTENT",
  "results": {
    "columns": [
      {
        "name": "job id",
        "logicalType": {
          "type": "VARCHAR",
          "nullable": true,
          "length": 2147483647
        },
        "comment": null
      }
    ],
    "rowFormat": "JSON",
    "data": [
      {
        "kind": "INSERT",
        "fields": [
          "fb1a5f06643364bc82a9a4e0bd3e9c10"
        ]
      }
    ]
  },
  "nextResultUri": "/v2/sessions/41ec5bb8-3574-4c6b-9b47-7bf9aa021ccc/operations/9bb84ff8-89a6-4f94-8dcc-e9125091c63b/result/1?rowFormat=JSON"
}

Because <span class="inline-code">resultType</span> is not <span class="inline-code">EOS</span> and there's a value for <span class="inline-code">nextResultUri</span> it tells us there's more to fetch - at the location specified in <span class="inline-code">nextResultUri</span>

{
  "resultType": "EOS",
  "isQueryResult": false,
  "jobID": "fb1a5f06643364bc82a9a4e0bd3e9c10",
  "resultKind": "SUCCESS_WITH_CONTENT",
  "results": {
    "columns": [
      {
        "name": "job id",
        "logicalType": {
          "type": "VARCHAR",
          "nullable": true,
          "length": 2147483647
        },
        "comment": null
      }
    ],
    "rowFormat": "JSON",
    "data": []
  }
}

5. Tidy up

Good practice is to close the session once you've finished with it:

$ http --follow --timeout 3600 DELETE 'localhost:8083/sessions/'$SESSIONHANDLE \
 Accept:'application/json'
HTTP/1.1 200 OK
access-control-allow-origin: *
connection: keep-alive
content-length: 19
content-type: application/json; charset=UTF-8

{
    "status": "CLOSED"
}

Using a Shell script to execute Flink SQL

We can use all of the above and a bit of bash to script this:

host='localhost:8083'

SESSIONHANDLE=$(printf '{
  "properties": {
    "execution.runtime-mode": "batch"
  }
}'| http  --follow --timeout 3600 POST $host'/sessions' \
 Content-Type:'application/json' \
 Accept:'application/json' | jq -r '.sessionHandle')

echo "Got session handle: "$SESSIONHANDLE


SQL_STATEMENT_ONE_LINE=$(tr '\n' ' ' < rmoff.sql)

OPERATIONHANDLE=$(printf '{
  "statement": "%s"
}' "$SQL_STATEMENT_ONE_LINE" | http --follow --timeout 3600 POST $host'/sessions/'$SESSIONHANDLE'/statements' \
 Content-Type:'application/json' \
 Accept:'application/json' | jq -r '.operationHandle')

echo "Got operation handle: "$OPERATIONHANDLE

while [ 1 -eq 1 ]
do
	STATUS=$(http --follow --timeout 3600 GET $host'/sessions/'$SESSIONHANDLE'/operations/'$OPERATIONHANDLE'/status' \
 Accept:'application/json' | jq -r '.status')
	 echo $STATUS
	if [ $STATUS != "RUNNING" ]; then
		break
	fi
	sleep 2
done

echo "\n\n----- 📃 RESULTS 📃 -----\n"
URL='/sessions/'$SESSIONHANDLE'/operations/'$OPERATIONHANDLE'/result/0?rowFormat=JSON' 
while [ 1 -eq 1 ]
do
  RESULT=$(http --follow --timeout 3600 GET $host$URL \
   Accept:'application/json')
  echo $RESULT | jq '.'
  URL=$(echo $RESULT | jq -r '.nextResultUri // ""') 
  if [ -z $URL ]; then
    break 
  fi
  echo "(next result chunk 👇)"
done

echo "Closing session 🗑️"
http --follow --timeout 3600 DELETE $host'/sessions/'$SESSIONHANDLE 

We'll put the actual SQL into a file called <span class="inline-code">rmoff.sql</span>:

CREATE TABLE t_foo WITH (
  'connector' = 'filesystem',
  'path' = 'file:///tmp/flink-test',
  'format' = 'csv',
  'csv.field-delimiter' = ','
) AS SELECT name, COUNT(*) AS cnt FROM (
  VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')
) AS NameTable(name) GROUP BY name;

Now when we run the shell script, we get this:

Got session handle: 8d7dc671-d7aa-4ddb-ba04-706b0311aa69
Got operation handle: 3aa41360-bd21-453a-a759-b54db69c81ae
RUNNING
FINISHED


----- 📃 RESULTS 📃 -----

{
  "resultType": "PAYLOAD",
  "isQueryResult": false,
  "jobID": "615365befee24c53d1efa195f9d72eee",
  "resultKind": "SUCCESS_WITH_CONTENT",
  "results": {
    "columns": [
      {
        "name": "job id",
        "logicalType": {
          "type": "VARCHAR",
          "nullable": true,
          "length": 2147483647
        },
        "comment": null
      }
    ],
    "rowFormat": "JSON",
    "data": [
      {
        "kind": "INSERT",
        "fields": [
          "615365befee24c53d1efa195f9d72eee"
        ]
      }
    ]
  },
  "nextResultUri": "/v2/sessions/8d7dc671-d7aa-4ddb-ba04-706b0311aa69/operations/3aa41360-bd21-453a-a759-b54db69c81ae/result/1?rowFormat=JSON"
}
(next result chunk 👇)
{
  "resultType": "EOS",
  "isQueryResult": false,
  "jobID": "615365befee24c53d1efa195f9d72eee",
  "resultKind": "SUCCESS_WITH_CONTENT",
  "results": {
    "columns": [
      {
        "name": "job id",
        "logicalType": {
          "type": "VARCHAR",
          "nullable": true,
          "length": 2147483647
        },
        "comment": null
      }
    ],
    "rowFormat": "JSON",
    "data": []
  }
}
Closing session 🗑️
HTTP/1.1 200 OK
access-control-allow-origin: *
connection: keep-alive
content-length: 19
content-type: application/json; charset=UTF-8

{
    "status": "CLOSED"
}

The actual SQL we ran wrote a CSV file to the <span class="inline-code">/tmp</span> folder, so let's go and check that it worked:

$ ls -lrt /tmp/flink-test && cat /tmp/flink-test/*
-rw-r--r--@ 1 rmoff  wheel  21  7 Mar 18:07 part-f50c05ae-e39e-40c1-8b00-b1a1ebfced0d-task-0-file-0
Alice,1
Bob,2
Greg,1

Nice - it is exactly as expected.

Where Next?

If you want to learn more about Flink SQL you might be interested in understanding more about the role of the Catalog, hands-on examples of using the Catalog, or a deep dive into using JARs with Flink SQL.

You might also be interested to try our Decodable which provides a fully managed Apache Flink and Debezium service. Our CLI and API both support deployment of pipelines with Flink SQL.

📫 Email signup 👇

Did you enjoy this issue of Checkpoint Chronicle? Would you like the next edition delivered directly to your email to read from the comfort of your own home?

Simply enter your email address here and we'll send you the next issue as soon as it's published—and nothing else, we promise!

👍 Got it!
Oops! Something went wrong while submitting the form.
Robin Moffatt

Robin is a Principal DevEx Engineer at Decodable. He has been speaking at conferences since 2009 including QCon, Devoxx, Strata, Kafka Summit, and Øredev. You can find many of his talks online and his articles on the Decodable blog as well as his own blog.

Outside of work, Robin enjoys running, drinking good beer, and eating fried breakfasts—although generally not at the same time.

The SQL Gateway in Apache Flink provides a way to run SQL in Flink from places other than the SQL Client. This includes using a JDBC Driver (which opens up a multitude of clients), a Hive client via the HiveServer2 endpoint, and directly against the REST Endpoint.

As I continue my journey with learning Flink SQL one thing I wondered was how one would go about submitting a Flink SQL job in a Production scenario. It seems to me that the SQL Gateway's REST API would be a good candidate for this. You'd put the SQL code in a file under source control, and then use a deployment pipeline to submit that SQL against the endpoint. It’s worth noting that it’s recommended to use application mode when deploying production jobs to Flink—and the SQL Client and Gateway don’t support this yet. There is a FLIP under discussion but in the meantime if you want to use application mode you’d need to wrap your SQL in a JAR to deploy it. Either that, or continue to use SQL Client or Gateway and be aware of the limitations of running the job in session mode (basically, no resource isolation).

In this article I'll show you how to use the endpoint, including exploring it with the Postman tool, using HTTPie to call the endpoint from the shell, and finishing off with a viable proof-of-concept script to execute statements from a script.

The API is documented and has two versions, each with their own OpenAPI YAML spec. I'm going to look at <span class="inline-code">v2</span> here. Note that the docs say that the spec is still experimental.

To get started, let's bring up the Flink cluster and the SQL Gateway locally:

./bin/start-cluster.sh
./bin/sql-gateway.sh start -Dsql-gateway.endpoint.rest.address=localhost

Here Comes the Postman

Postman is a handy tool for doing tons of useful stuff with APIs. Here I'm just using it to create the sample calls against the endpoints and to quickly understand how they relate to each other. You can use it on the web but assuming you're using a local instance of Flink (or at least, one that is not publicly accessible) then you'll want the desktop version. Note that you'll still need to sign up for a free account to access the Import feature that we're using here.

Under your Workspace click Import and paste in the URL of the SQL Gateway's OpenAPI YAML file (<span class="inline-code">https://nightlies.apache.org/flink/flink-docs-master/generated/rest_v2_sql_gateway.yml</span>) which is linked to on the docs page. Import it as a Postman Collection.

CleanShot 2024-03-07 at 17.21.51 1.png

You'll now see under the Postman collection a list of all the API endpoints, with pre-created calls for each. Go to Environments -> Globals and define <span class="inline-code">baseUrl</span> with the value for your SQL Gateway. If you're running it locally then this is going to be <span class="inline-code">http://localhost:8083</span>

CleanShot 2024-03-07 at 17.27.59.png

Now go back to Collections and under the Flink SQL Gateway REST API folder find the <span class="inline-code">get Info</span> call. Open this up and hit Send. You should see a successful response like this:

CleanShot 2024-03-07 at 17.32.59.png

You can also click the Code icon (<span class="inline-code">&lt;/&gt;</span> ) to see the call in various different languages and tools including cURL and HTTPie. For now this is not ground-breaking, but once you get onto payloads it's really handy.

CleanShot 2024-03-07 at 17.34.00.png

Just as we manually populated the global variable <span class="inline-code">baseURL</span> above, we can also take the response from one call and use it in the making of another. This is really useful, because there are two variables that the REST API returns that we need to use (<span class="inline-code">sessionHandle</span> and <span class="inline-code">operationHandle</span>). To do this in Postman add the following to the Tests tab of the request pane:

var jsonData = JSON.parse(responseBody);
postman.setEnvironmentVariable("sessionHandle", jsonData.sessionHandle);

That assumes that the variable to populate is called <span class="inline-code">sessionHandle</span> and it's returned in the root key called <span class="inline-code">sessionHandle</span> of the response. Which it is:

$ http  --follow --timeout 3600 POST 'localhost:8083/sessions' \
 Content-Type:'application/json' \
 Accept:'application/json'
HTTP/1.1 200 OK
access-control-allow-origin: *
connection: keep-alive
content-length: 56
content-type: application/json; charset=UTF-8

{
    "sessionHandle": "190edef5-df00-4182-be0e-431737b1e93b"
}

Once you've set a variable, you can use it in other calls by referencing it in double curly braces, like this:

CleanShot 2024-03-07 at 17.46.23.png

I’ve shared a copy of my Postman collection with the above variable configuration done for you here

Let's now go through the workflow of how we'd actually submit a SQL statement from scratch to the gateway.

Running a SQL Statement with the Flink SQL Gateway

In essence, the minimal steps are as follows. You can see the docs for more info.

  1. Establish a Session (with optional configuration parameters set)
  2. Submit a SQL Statement, which generates an Operation.
  3. Check the status of the Operation until it's complete
  4. Fetch the results of the Operation.

Here's how to do each one, using HTTPie as an example client and showing the response. I'm using bash variables to hold the values of session and operation handles.

0. Check the connection and Flink version

$ http --body --follow --timeout 3600 GET 'localhost:8083/info' \
 Accept:'application/json'
{
    "productName": "Apache Flink",
    "version": "1.18.1"
}

1. Create a session

POST /session

$ printf '{
  "properties": {
    "execution.runtime-mode": "batch"
  }
}'| http  --follow --timeout 3600 POST 'localhost:8083/sessions' \
 Content-Type:'application/json' \
 Accept:'application/json'

HTTP/1.1 200 OK
access-control-allow-origin: *
connection: keep-alive
content-length: 56
content-type: application/json; charset=UTF-8

{
    "sessionHandle": "e296eb18-9b6e-4fbc-bd6c-0cbb93a7fe28"
}

$ export SESSIONHANDLE="e296eb18-9b6e-4fbc-bd6c-0cbb93a7fe28"

[Optional] Validate session and read session config

GET /sessions/:session_handle

Note here the <span class="inline-code">runtime-mode</span> has been set from the <span class="inline-code">properties</span> that were passed above in the session creation.

$ http --ignore-stdin --form --follow --timeout 3600 GET 'localhost:8083/sessions/'$SESSIONHANDLE \
 Accept:'application/json'
HTTP/1.1 200 OK
access-control-allow-origin: *
connection: keep-alive
content-length: 2129
content-type: application/json; charset=UTF-8
{
    "properties": {
        "env.java.opts.all": "--add-exports=java.base/sun.net.util=ALL-UNNAMED --add-exports=java.rmi/sun.rmi.registry=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.api=ALL-UNN
AMED --add-exports=jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.parser=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.tree=AL
L-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.util=ALL-UNNAMED --add-exports=java.security.jgss/sun.security.krb5=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED --add-op
ens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/ja
va.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.text=ALL-UNNAMED --add-opens=java.base/java.time=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.ut
il.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.locks=ALL-UNNAMED",
        "execution.attached": "true",
        "execution.runtime-mode": "batch",
        "execution.savepoint-restore-mode": "NO_CLAIM",
        "execution.savepoint.ignore-unclaimed-state": "false",
        "execution.shutdown-on-attached-exit": "false",
        "execution.target": "remote",
        "jobmanager.bind-host": "localhost",
        "jobmanager.execution.failover-strategy": "region",
        "jobmanager.memory.process.size": "1600m",
        "jobmanager.rpc.address": "localhost",
        "jobmanager.rpc.port": "6123",
        "parallelism.default": "1",
        "pipeline.classpaths": "",
        "pipeline.jars": "file:/Users/rmoff/flink/flink-1.18.1/opt/flink-python-1.18.1.jar",
        "rest.address": "localhost",
        "rest.bind-address": "localhost",
        "sql-gateway.endpoint.rest.address": "localhost",
        "table.catalog-store.file.path": "./conf/catalogs",
        "table.catalog-store.kind": "file",
        "table.resources.download-dir": "/var/folders/7x/nscwrz557vlcd_ydgt7d5wt00000gn/T/sql-gateway-e296eb18-9b6e-4fbc-bd6c-0cbb93a7fe28",
        "taskmanager.bind-host": "localhost",
        "taskmanager.host": "localhost",
        "taskmanager.memory.process.size": "1728m",
        "taskmanager.numberOfTaskSlots": "1"
    }
}

2. Submit a SQL statement

POST /sessions/:session_handle/statements

$ printf '{
  "statement": "CREATE  TABLE t_foo  WITH ( '\''connector'\'' = '\''filesystem'\'', '\''path'\'' = '\''file:///tmp/flink-test'\'', '\''format'\'' = '\''csv'\'', '\''csv.field-delimiter'\'' = '\'','\'' ) AS SELECT name, COUNT(*) AS cnt FROM (VALUES ('\''Bob'\''), ('\''Alice'\''), ('\''Greg'\''), ('\''Bob'\'')) AS NameTable(name) GROUP BY name;"
}'| http  --follow --timeout 3600 POST 'localhost:8083/sessions/'$SESSIONHANDLE'/statements' \
 Content-Type:'application/json' \
 Accept:'application/json'
HTTP/1.1 200 OK
access-control-allow-origin: *
connection: keep-alive
content-length: 58
content-type: application/json; charset=UTF-8

{
    "operationHandle": "ba45649c-07b2-4b1c-a190-df3631b53549"
}

$ export OPERATIONHANDLE="ba45649c-07b2-4b1c-a190-df3631b53549"

3. Get Statement Execution Status

GET /sessions/:session_handle/operations/:operation_handle/status

$ http --follow --timeout 3600 GET 'localhost:8083/sessions/'$SESSIONHANDLE'/operations/'$OPERATIONHANDLE'/status' \
 Accept:'application/json'
HTTP/1.1 200 OK
access-control-allow-origin: *
connection: keep-alive
content-length: 21
content-type: application/json; charset=UTF-8

{
    "status": "FINISHED"
}

4. Get Results

GET /sessions/:session_handle/operations/:operation_handle/result/:token

$ http --follow --timeout 3600 GET 'localhost:8083/sessions/'$SESSIONHANDLE'/operations/'$OPERATIONHANDLE'/result/0?rowFormat=JSON' \
 Accept:'application/json'

HTTP/1.1 200 OK
access-control-allow-origin: *
connection: keep-alive
content-length: 483
content-type: application/json; charset=UTF-8
{
  "resultType": "PAYLOAD",
  "isQueryResult": false,
  "jobID": "fb1a5f06643364bc82a9a4e0bd3e9c10",
  "resultKind": "SUCCESS_WITH_CONTENT",
  "results": {
    "columns": [
      {
        "name": "job id",
        "logicalType": {
          "type": "VARCHAR",
          "nullable": true,
          "length": 2147483647
        },
        "comment": null
      }
    ],
    "rowFormat": "JSON",
    "data": [
      {
        "kind": "INSERT",
        "fields": [
          "fb1a5f06643364bc82a9a4e0bd3e9c10"
        ]
      }
    ]
  },
  "nextResultUri": "/v2/sessions/41ec5bb8-3574-4c6b-9b47-7bf9aa021ccc/operations/9bb84ff8-89a6-4f94-8dcc-e9125091c63b/result/1?rowFormat=JSON"
}

Because <span class="inline-code">resultType</span> is not <span class="inline-code">EOS</span> and there's a value for <span class="inline-code">nextResultUri</span> it tells us there's more to fetch - at the location specified in <span class="inline-code">nextResultUri</span>

{
  "resultType": "EOS",
  "isQueryResult": false,
  "jobID": "fb1a5f06643364bc82a9a4e0bd3e9c10",
  "resultKind": "SUCCESS_WITH_CONTENT",
  "results": {
    "columns": [
      {
        "name": "job id",
        "logicalType": {
          "type": "VARCHAR",
          "nullable": true,
          "length": 2147483647
        },
        "comment": null
      }
    ],
    "rowFormat": "JSON",
    "data": []
  }
}

5. Tidy up

Good practice is to close the session once you've finished with it:

$ http --follow --timeout 3600 DELETE 'localhost:8083/sessions/'$SESSIONHANDLE \
 Accept:'application/json'
HTTP/1.1 200 OK
access-control-allow-origin: *
connection: keep-alive
content-length: 19
content-type: application/json; charset=UTF-8

{
    "status": "CLOSED"
}

Using a Shell script to execute Flink SQL

We can use all of the above and a bit of bash to script this:

host='localhost:8083'

SESSIONHANDLE=$(printf '{
  "properties": {
    "execution.runtime-mode": "batch"
  }
}'| http  --follow --timeout 3600 POST $host'/sessions' \
 Content-Type:'application/json' \
 Accept:'application/json' | jq -r '.sessionHandle')

echo "Got session handle: "$SESSIONHANDLE


SQL_STATEMENT_ONE_LINE=$(tr '\n' ' ' < rmoff.sql)

OPERATIONHANDLE=$(printf '{
  "statement": "%s"
}' "$SQL_STATEMENT_ONE_LINE" | http --follow --timeout 3600 POST $host'/sessions/'$SESSIONHANDLE'/statements' \
 Content-Type:'application/json' \
 Accept:'application/json' | jq -r '.operationHandle')

echo "Got operation handle: "$OPERATIONHANDLE

while [ 1 -eq 1 ]
do
	STATUS=$(http --follow --timeout 3600 GET $host'/sessions/'$SESSIONHANDLE'/operations/'$OPERATIONHANDLE'/status' \
 Accept:'application/json' | jq -r '.status')
	 echo $STATUS
	if [ $STATUS != "RUNNING" ]; then
		break
	fi
	sleep 2
done

echo "\n\n----- 📃 RESULTS 📃 -----\n"
URL='/sessions/'$SESSIONHANDLE'/operations/'$OPERATIONHANDLE'/result/0?rowFormat=JSON' 
while [ 1 -eq 1 ]
do
  RESULT=$(http --follow --timeout 3600 GET $host$URL \
   Accept:'application/json')
  echo $RESULT | jq '.'
  URL=$(echo $RESULT | jq -r '.nextResultUri // ""') 
  if [ -z $URL ]; then
    break 
  fi
  echo "(next result chunk 👇)"
done

echo "Closing session 🗑️"
http --follow --timeout 3600 DELETE $host'/sessions/'$SESSIONHANDLE 

We'll put the actual SQL into a file called <span class="inline-code">rmoff.sql</span>:

CREATE TABLE t_foo WITH (
  'connector' = 'filesystem',
  'path' = 'file:///tmp/flink-test',
  'format' = 'csv',
  'csv.field-delimiter' = ','
) AS SELECT name, COUNT(*) AS cnt FROM (
  VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')
) AS NameTable(name) GROUP BY name;

Now when we run the shell script, we get this:

Got session handle: 8d7dc671-d7aa-4ddb-ba04-706b0311aa69
Got operation handle: 3aa41360-bd21-453a-a759-b54db69c81ae
RUNNING
FINISHED


----- 📃 RESULTS 📃 -----

{
  "resultType": "PAYLOAD",
  "isQueryResult": false,
  "jobID": "615365befee24c53d1efa195f9d72eee",
  "resultKind": "SUCCESS_WITH_CONTENT",
  "results": {
    "columns": [
      {
        "name": "job id",
        "logicalType": {
          "type": "VARCHAR",
          "nullable": true,
          "length": 2147483647
        },
        "comment": null
      }
    ],
    "rowFormat": "JSON",
    "data": [
      {
        "kind": "INSERT",
        "fields": [
          "615365befee24c53d1efa195f9d72eee"
        ]
      }
    ]
  },
  "nextResultUri": "/v2/sessions/8d7dc671-d7aa-4ddb-ba04-706b0311aa69/operations/3aa41360-bd21-453a-a759-b54db69c81ae/result/1?rowFormat=JSON"
}
(next result chunk 👇)
{
  "resultType": "EOS",
  "isQueryResult": false,
  "jobID": "615365befee24c53d1efa195f9d72eee",
  "resultKind": "SUCCESS_WITH_CONTENT",
  "results": {
    "columns": [
      {
        "name": "job id",
        "logicalType": {
          "type": "VARCHAR",
          "nullable": true,
          "length": 2147483647
        },
        "comment": null
      }
    ],
    "rowFormat": "JSON",
    "data": []
  }
}
Closing session 🗑️
HTTP/1.1 200 OK
access-control-allow-origin: *
connection: keep-alive
content-length: 19
content-type: application/json; charset=UTF-8

{
    "status": "CLOSED"
}

The actual SQL we ran wrote a CSV file to the <span class="inline-code">/tmp</span> folder, so let's go and check that it worked:

$ ls -lrt /tmp/flink-test && cat /tmp/flink-test/*
-rw-r--r--@ 1 rmoff  wheel  21  7 Mar 18:07 part-f50c05ae-e39e-40c1-8b00-b1a1ebfced0d-task-0-file-0
Alice,1
Bob,2
Greg,1

Nice - it is exactly as expected.

Where Next?

If you want to learn more about Flink SQL you might be interested in understanding more about the role of the Catalog, hands-on examples of using the Catalog, or a deep dive into using JARs with Flink SQL.

You might also be interested to try our Decodable which provides a fully managed Apache Flink and Debezium service. Our CLI and API both support deployment of pipelines with Flink SQL.

📫 Email signup 👇

Did you enjoy this issue of Checkpoint Chronicle? Would you like the next edition delivered directly to your email to read from the comfort of your own home?

Simply enter your email address here and we'll send you the next issue as soon as it's published—and nothing else, we promise!

Robin Moffatt

Robin is a Principal DevEx Engineer at Decodable. He has been speaking at conferences since 2009 including QCon, Devoxx, Strata, Kafka Summit, and Øredev. You can find many of his talks online and his articles on the Decodable blog as well as his own blog.

Outside of work, Robin enjoys running, drinking good beer, and eating fried breakfasts—although generally not at the same time.