Back
January 21, 2025
12
min read

Get Running with Apache Flink on Kubernetes, part 1 of 2

By
Gunnar Morling
Share this post

Kubernetes is a widely used deployment platform for Apache Flink. While Flink has had native support for Kubernetes for quite a while, it is in particular the operator pattern which makes deploying Flink jobs onto Kubernetes clusters a  compelling option: you define jobs in a declarative resource, and a control loop running in a component called a Kubernetes operator takes care of provisioning and maintaining (e.g. scaling, updating) all the required resources. Automation is the keyword here, significantly reducing the manual effort required for running Flink jobs in production.

There are multiple Kubernetes operators for Flink, including Lyft’s flinkk8soperator, flink-on-k8s-operator by Spotify (a fork of a now unmaintained operator initially developed by Google), and flink-controller by Andrea Medeghini. The most widely used operator these days though is the upstream Flink Kubernetes Operator, developed under the umbrella of the Apache Flink project itself, which is also the focus of this two-part blog post series. It gives you an overview on what it takes to install the operator and run your first jobs on Kubernetes, and discuss what’s missing to create a complete Flink-based data platform for an organization.

In part one, we’ll touch on installation and setup, deploying Flink jobs via custom resources and creating container images for your own Flink jobs. Part two covers fault tolerance and high availability, savepoint management, observability, and then UI access.

The idea for these posts is to give you a fully runnable example so you can try out everything yourself. To follow along, grab the complete source code in the Decodable examples repository on GitHub:

$ git clone
$ cd flink-on-kubernetes

You’ll need the following prerequisites:

  • Docker, for running and building container images
  • kind, for setting up a local Kubernetes cluster, and kubectl for interacting with it
  • Helm, for installing the Kubernetes operator

Fig. 1 gives an overview of the solution we are going to build in this blog series, with all the key resources and most relevant interactions:

Fig. 1: Solution overview

Installation and Setup

Start by creating a local Kubernetes cluster using <span class="inline-code">kind</span>:

$ kind create cluster --name my-cluster --config kind-cluster.yml

The cluster contains a control plane as well as a worker node. The kind configuration makes the directory /tmp/kind-worker available to the worker node of the cluster, which will allow you to keep Flink savepoints when rebuilding the kind cluster. It also maps ports 80 and 443 of the worker node to the host, which will allow you to access the Flink web UI via a Kubernetes ingress later on:

kind: Cluster
apiVersion: kind.x-k8s.io/v1alpha4
nodes:
 - role: control-plane
 - role: worker
   extraMounts:
   - hostPath: /tmp/kind-worker
     containerPath: /files
   extraPortMappings:
   - containerPort: 80
     hostPort: 80
     protocol: TCP
   - containerPort: 443
     hostPort: 443
     protocol: TCP

‍<div style="text-align: center">Listing 1: kind-cluster.yml for spinning up a local Kubernetes cluster</div>

Verify the cluster is running:

$ kubectl get nodes
NAME                       STATUS   ROLES           AGE     VERSION
my-cluster-control-plane   Ready    control-plane   2d18h   v1.29.2
my-cluster-worker          Ready    <none>          2d18h   v1.29.2

Installing the Flink Kubernetes operator is straight-forward thanks to the provided Helm chart. Add the Helm repository for the current version 1.10 (released in October 2024) of the operator like so:

$ helm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-1.10.0/

Before installing the operator itself, it is required to install the Kubernetes cert-manager (needed for issuing TLS certificates used by the operator’s admission controller):

$ kubectl create -f https://github.com/jetstack/cert-manager/releases/download/v1.8.2/cert-manager.yaml

The operator Helm chart can be customized upon installation with a wide set of parameters, for instance to control which Kubernetes namespaces the operator should observe, which images should be used by default for deploying Flink jobs, etc. We’re gonna make the following adjustments:

  • Inject the configuration required for running Flink jobs on Java 17; while Flink itself provides support for Java 17 since version 1.18, the operator in version 1.10 doesn’t pass the <span class="inline-code">--add-opens</span> flags required to run Flink on Java 17 and newer. This has been added recently (see FLINK-36646), but no release of the operator with those changes is available as of now
  • Shorten the lifetime of Flink job managers after a job has been suspended; this setting, <span class="inline-code">kubernetes.operator.jm-deployment.shutdown-ttl</span>, defaults to one day, which means you’ll have a superfluous job manager pod in your cluster for quite a while when suspending a job

The Helm values file with these configuration amendments looks like so:

defaultConfiguration:
  create: true
  append: true
  flink-conf.yaml: |+
    kubernetes.operator.default-configuration.flink-version.v1_20.env.java.default-opts.all: --add-exports=java.base/sun.net.util=ALL-UNNAMED --add-exports=java.rmi/sun.rmi.registry=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.api=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.parser=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.tree=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.util=ALL-UNNAMED --add-exports=java.security.jgss/sun.security.krb5=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.text=ALL-UNNAMED --add-opens=java.base/java.time=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.locks=ALL-UNNAMED

    kubernetes.operator.jm-deployment.shutdown-ttl: 10000

‍<div style="text-align: center">Listing 2: helm-values.yaml for customizing the operator configuration</div>

Install the Helm chart, using this values file:

$ helm install -f helm-values.yaml flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator

This should yield the following output:

NAME: flink-kubernetes-operator
LAST DEPLOYED: Mon Jan 20 17:02:43 2025
NAMESPACE: default
STATUS: deployed
REVISION: 1
TEST SUITE: None

<div class="side-note">The Helm chart creates two Kubernetes roles and associated service accounts: <span class="inline-code">flink-operator</span>, used by the Flink operator for managing Flink deployments, and <span class="inline-code">flink</span>, used by the Flink job manager for spawning task manager pods. By default, the former is cluster-scoped, while the latter is namespace-scoped. In the following, we are going to deploy Flink jobs in the default namespace; if you want to deploy jobs in other namespaces, refer to the operator’s RBAC documentation to learn how to create the required role and service account.</div>

Next, we are going to create a deployment of MinIO to have an S3 backend for persisting Flink checkpoints and savepoints as well as Flink high availability data, independent of the lifetime of specific Flink pods. The MinIO deployment is taken pretty much verbatim from the upstream Kubernetes examples, only that the volume for persisting the MinIO data is backed by the aforementioned Kind extra mount:

$ kubectl apply -f storage/

Create a bucket named <span class="inline-code">flink-data</span>:

$ kubectl -n default run minio-client --image=minio/mc:latest --restart=Never --command=true -- sh -c "mc config host add minio http://minio-service.default.svc.cluster.local:9000 minio minio123 && mc mb minio/flink-data"

This concludes the installation steps and we are ready to run our first Flink job on Kubernetes.

Deployment Types

Following the Kubernetes operator pattern, Flink jobs are deployed using custom Kubernetes resource definitions (CRDs). This means you define a job and its configuration in a resource descriptor (a YAML file), and the operator will take care of creating and managing all the required Kubernetes resources, such as pods for job and task managers, config maps for job configuration, ingresses for accessing the Flink web UI, etc. If you make changes to a job’s definition (or delete it), the operator will perform the steps required to keep the deployed resources in sync with the resource definition.

The Flink Kubernetes operator supports running Flink in both application and session mode, with dedicated resource definitions for each mode. In application mode, each job is executed in its own Flink cluster, i.e. with its own job manager. When the job is finished, that cluster will be shut down. The <span class="inline-code">FlinkDeployment</span> resource type is used for running jobs in application mode.

In contrast, Flink’s session mode utilizes a long-running Flink cluster onto which multiple jobs are deployed, using the <span class="inline-code">FlinkSessionJob</span> resource type. Session mode can be more resource-efficient due to the shared job manager, at the cost of reduced isolation between jobs.

In the following, we are going to use application mode, i.e., a dedicated Flink cluster per job.

Sticking to the recommendations from the operator documentation, we’ll deploy Flink using the Native Kubernetes resource provider, which means that Flink will directly talk to the Kubernetes API to orchestrate resources such as task manager pods. In particular when deploying jobs of unknown provenience, it can make sense to use the Standalone provider instead, in which case Flink jobs don’t have access to the Kubernetes API, but instead the operator is managing all the required resources.

Deploying Your First Flink Job on Kubernetes

The operator’s repository on GitHub contains a number of very useful example resource definitions. Slightly adjusted to run with Flink 1.20 on Java 17, the most basic <span class="inline-code">FlinkDeployment</span> definition for running one of the example jobs shipping with Flink looks like this:

apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: basic-example
spec:
  image: flink:1.20-java17
  flinkVersion: v1_20
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
  serviceAccount: flink
  jobManager:
    resource:
      memory: "2048m"
      cpu: 1
  taskManager:
    resource:
      memory: "2048m"
      cpu: 1
  job:
    jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
    parallelism: 2
    upgradeMode: stateless

‍<div style="text-align: center">Listing 3: Resource definition basic.yaml for a first Flink job</div>

This is running the StateMachineExample.jar job on Flink 1.20 with a parallelism of 2, assigning 1 CPU core and 2 GB of RAM to the job manager and task manager pods, respectively. No execution state will be retained between job runs (upgrade mode <span class="inline-code">stateless</span>). To deploy this job, apply the resource definition using <span class="inline-code">kubectl</span>:

$ kubectl apply -f flink/basic.yaml

As the operator exposes Flink deployments just like any other kind of Kubernetes resource, you can examine their status with kubectl, too (output adjusted for readability):

$ kubectl describe FlinkDeployment basic-example
Name:         basic-example
Namespace:    default
Labels:       <none>
Annotations:  <none>
API Version:  flink.apache.org/v1beta1
Kind:         FlinkDeployment
Metadata:
  Creation Timestamp:  2025-01-15T09:58:26Z
  Finalizers:
    flinkdeployments.flink.apache.org/finalizer
  Generation:  2
  Managed Fields:
    API Version:  flink.apache.org/v1beta1
    ...
  Resource Version:  16235
  UID:               470322f2-2658-48d0-bd86-7473816a5e40
Spec:
  Flink Configuration:
    taskmanager.numberOfTaskSlots:  2
  Flink Version:                    v1_20
  Image:                            flink:1.20
  Job:
    Args:
    Jar URI:       local:///opt/flink/examples/streaming/StateMachineExample.jar
    Parallelism:   4
    State:         running
    Upgrade Mode:  stateless
  Job Manager:
    Replicas:  1
    Resource:
      Cpu:          1
      Memory:       2048m
  Service Account:  flink
  Task Manager:
    Resource:
      Cpu:     1
      Memory:  2048m
Status:
  Cluster Info:
    Flink - Revision:             b1fe7b4 @ 2024-07-25T04:22:22+02:00
    Flink - Version:              1.20.0
    Total - Cpu:                  3.0
    Total - Memory:               6442450944
  Job Manager Deployment Status:  READY
  Job Status:
    Checkpoint Info:
      Last Periodic Checkpoint Timestamp:  0
    Job Id:                                0f646b467710dd8f39e856f4a892be1d
    Job Name:                              State machine job
    Savepoint Info:
      Last Periodic Savepoint Timestamp:  0
      Savepoint History:
    Start Time:         1736935168717
    State:              RUNNING
    Update Time:        1736935179288
  Lifecycle State:      STABLE
  Observed Generation:  2
  Reconciliation Status:
    Last Reconciled Spec:     {"spec":{...}} 
    Last Stable Spec:         {"spec":{...}} 
    Reconciliation Timestamp:  1736935106418
    State:                     DEPLOYED
  Task Manager:
    Label Selector:  component=taskmanager,app=basic-example
    Replicas:        2
Events:
  Type    Reason            Age    From                  Message
  ----    ------            ----   ----                  -------
  Normal  Submit            6m28s  JobManagerDeployment  Starting deployment
  Normal  JobStatusChanged  5m15s  Job                   Job status changed from RECONCILING to RUNNING

When deploying a job, the operator also creates a Kubernetes Service for exposing the Flink web UI. For testing purposes, you can forward its port, allowing you to access the UI on http://localhost:8081:

$ kubectl port-forward service/basic-example-rest 8081
Fig. 2: The Flink web UI

Next, let’s make some changes to the job definition. Currently, the job runs with a parallelism of 2. As there are also two slots per task manager, a single task manager pod is running (besides the job manager pod) for the job:

$ kubectl get pods -l app=basic-example
NAME                             READY   STATUS    RESTARTS   AGE
basic-example-5d9dd5d5c4-h42jd   1/1     Running   0          14s
basic-example-taskmanager-1-1    1/1     Running   0          6s

Patch the resource to change the parallelism to 4:

$ kubectl patch FlinkDeployment basic-example --patch '{"spec" : { "job" : { "parallelism" : 4 }}}' --type=merge

The operator will recognize the change and reconcile the deployment accordingly. Shortly thereafter, you’ll see another task pod running:

$ kubectl get pods -l app=basic-example
NAME                             READY   STATUS    RESTARTS   AGE
basic-example-6fcfd8464c-sm9vc   1/1     Running   0          77s
basic-example-taskmanager-1-1    1/1     Running   0          68s
basic-example-taskmanager-1-2    1/1     Running   0          68s

If you go back to the Flink web UI (recreate the port forwarding, if needed), you’ll also see that there are two task managers now, with four task slots overall:

Fig. 3: Updated job in the Flink web UI

Finally, to delete the job, run the following:

$ kubectl delete FlinkDeployment basic-example

Besides Java, you can also implement your streaming processing jobs using Flink’s Python API, PyFlink. Refer to this post for learning how to get started with running your first PyFlink jobs on Kubernetes.

Building Custom Job Images

So far, we’ve run an example Flink job which is contained within the upstream Flink container image. In practise, you want to run your own Flink job JARs of course. Multiple options exist for doing so:

  • Adding an init container to the Flink pods which retrieves the JAR at start-up from a remote location such as an S3 bucket, as shown in this blog post
  • Retrieving the JAR from an HTTP server—for instance an internal Maven repository—by specifying an HTTP URI for the <span class="inline-code">jarURI</span> parameter; when using Flink 1.19 or later, the parameter also accepts S3 URIs, provided an S3 file system plug-in has been configured (see FLINK-28915)
  • Building a custom container image, extending the upstream Flink image and adding your job JAR file at build time

While all these approaches do the trick, my personal preference and recommendation is the last one, building a custom image with your job, rather than fetching the JAR at runtime. This can help to speed up things when restarting a job (as the image may be retrieved from a node-local image cache) and generally is more in line with the philosophy of immutable container images; for instance, when inspecting the image run by a pod, it’s immediately clear which version of the Flink job this is.

The example project of this post contains a small Flink job, which prints out a sequence of numbers, using Flink’s built-in DataGen source. You can build a container image for this job via Docker like so:

$ docker build -t decodable-examples/hello-world-job:1.0 hello-world-job

While you’d normally push this image to an image registry such as ECR, we’re simply going to load it directly into Kind in this case:

$ kind load docker-image decodable-examples/hello-world-job:1.0 --name my-cluster

Next, we need to create a <span class="inline-code">FlinkDeployment</span> resource for this job. This is very similar to the one before, only that now we’re referring to our custom image containing the Flink job JAR file:

apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: custom-job
spec:
  image: decodable-examples/flink-hello-world:1.0
  flinkVersion: v1_20
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
  serviceAccount: flink
  jobManager:
    resource:
      memory: "2048m"
      cpu: 1
  taskManager:
    resource:
      memory: "2048m"
      cpu: 1
  job:
    jarURI: local:///opt/flink/examples/streaming/flink-hello-world-0.1.jar
    parallelism: 1
    upgradeMode: stateless

‍<div style="text-align: center">Listing 4: Resource definition custom-job.yaml for a custom Flink job</div>

Apply the descriptor:

$ kubectl apply -f flink/custom-job.yaml

Then verify the job status is running:

$ kubectl get FlinkDeployment custom-job
NAME         JOB STATUS   LIFECYCLE STATE
custom-job   RUNNING      STABLE

For further analysis, you can also take a look at the logs of the task manager pod like so:

$ kubectl logs -l app=custom-job,component=taskmanager -f
...
2025-01-20 11:07:40,323 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Source: Generator Source -> Sink: Writer (1/1)#0 (d70586a2890af7b3459578ad30dcd550_cbc357ccb763df2852fee8c4fc7d55f2_0_0) switched from DEPLOYING to INITIALIZING.
2025-01-20 11:07:40,365 INFO  org.apache.flink.runtime.state.StateBackendLoader            [] - State backend is set to heap memory org.apache.flink.runtime.state.hashmap.HashMapStateBackend@79439cec
2025-01-20 11:07:40,375 INFO  org.apache.flink.runtime.state.StateBackendLoader            [] - State backend is set to heap memory org.apache.flink.runtime.state.hashmap.HashMapStateBackend@177287da
2025-01-20 11:07:40,380 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Source: Generator Source -> Sink: Writer (1/1)#0 (d70586a2890af7b3459578ad30dcd550_cbc357ccb763df2852fee8c4fc7d55f2_0_0) switched from INITIALIZING to RUNNING.
Number: 0
Number: 1
Number: 2
...

This concludes part one of this blog post series. Head over to part two to learn about fault tolerance and high availability of your Flink jobs running on Kubernetes, savepoint management, observability, and more.

📫 Email signup 👇

Did you enjoy this issue of Checkpoint Chronicle? Would you like the next edition delivered directly to your email to read from the comfort of your own home?

Simply enter your email address here and we'll send you the next issue as soon as it's published—and nothing else, we promise!

đź‘Ť Got it!
Oops! Something went wrong while submitting the form.
Gunnar Morling

Gunnar is an open-source enthusiast at heart, currently working on Apache Flink-based stream processing. In his prior role as a software engineer at Red Hat, he led the Debezium project, a distributed platform for change data capture. He is a Java Champion and has founded multiple open source projects such as JfrUnit, kcctl, and MapStruct.

Kubernetes is a widely used deployment platform for Apache Flink. While Flink has had native support for Kubernetes for quite a while, it is in particular the operator pattern which makes deploying Flink jobs onto Kubernetes clusters a  compelling option: you define jobs in a declarative resource, and a control loop running in a component called a Kubernetes operator takes care of provisioning and maintaining (e.g. scaling, updating) all the required resources. Automation is the keyword here, significantly reducing the manual effort required for running Flink jobs in production.

There are multiple Kubernetes operators for Flink, including Lyft’s flinkk8soperator, flink-on-k8s-operator by Spotify (a fork of a now unmaintained operator initially developed by Google), and flink-controller by Andrea Medeghini. The most widely used operator these days though is the upstream Flink Kubernetes Operator, developed under the umbrella of the Apache Flink project itself, which is also the focus of this two-part blog post series. It gives you an overview on what it takes to install the operator and run your first jobs on Kubernetes, and discuss what’s missing to create a complete Flink-based data platform for an organization.

In part one, we’ll touch on installation and setup, deploying Flink jobs via custom resources and creating container images for your own Flink jobs. Part two covers fault tolerance and high availability, savepoint management, observability, and then UI access.

The idea for these posts is to give you a fully runnable example so you can try out everything yourself. To follow along, grab the complete source code in the Decodable examples repository on GitHub:

$ git clone
$ cd flink-on-kubernetes

You’ll need the following prerequisites:

  • Docker, for running and building container images
  • kind, for setting up a local Kubernetes cluster, and kubectl for interacting with it
  • Helm, for installing the Kubernetes operator

Fig. 1 gives an overview of the solution we are going to build in this blog series, with all the key resources and most relevant interactions:

Fig. 1: Solution overview

Installation and Setup

Start by creating a local Kubernetes cluster using <span class="inline-code">kind</span>:

$ kind create cluster --name my-cluster --config kind-cluster.yml

The cluster contains a control plane as well as a worker node. The kind configuration makes the directory /tmp/kind-worker available to the worker node of the cluster, which will allow you to keep Flink savepoints when rebuilding the kind cluster. It also maps ports 80 and 443 of the worker node to the host, which will allow you to access the Flink web UI via a Kubernetes ingress later on:

kind: Cluster
apiVersion: kind.x-k8s.io/v1alpha4
nodes:
 - role: control-plane
 - role: worker
   extraMounts:
   - hostPath: /tmp/kind-worker
     containerPath: /files
   extraPortMappings:
   - containerPort: 80
     hostPort: 80
     protocol: TCP
   - containerPort: 443
     hostPort: 443
     protocol: TCP

‍<div style="text-align: center">Listing 1: kind-cluster.yml for spinning up a local Kubernetes cluster</div>

Verify the cluster is running:

$ kubectl get nodes
NAME                       STATUS   ROLES           AGE     VERSION
my-cluster-control-plane   Ready    control-plane   2d18h   v1.29.2
my-cluster-worker          Ready    <none>          2d18h   v1.29.2

Installing the Flink Kubernetes operator is straight-forward thanks to the provided Helm chart. Add the Helm repository for the current version 1.10 (released in October 2024) of the operator like so:

$ helm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-1.10.0/

Before installing the operator itself, it is required to install the Kubernetes cert-manager (needed for issuing TLS certificates used by the operator’s admission controller):

$ kubectl create -f https://github.com/jetstack/cert-manager/releases/download/v1.8.2/cert-manager.yaml

The operator Helm chart can be customized upon installation with a wide set of parameters, for instance to control which Kubernetes namespaces the operator should observe, which images should be used by default for deploying Flink jobs, etc. We’re gonna make the following adjustments:

  • Inject the configuration required for running Flink jobs on Java 17; while Flink itself provides support for Java 17 since version 1.18, the operator in version 1.10 doesn’t pass the <span class="inline-code">--add-opens</span> flags required to run Flink on Java 17 and newer. This has been added recently (see FLINK-36646), but no release of the operator with those changes is available as of now
  • Shorten the lifetime of Flink job managers after a job has been suspended; this setting, <span class="inline-code">kubernetes.operator.jm-deployment.shutdown-ttl</span>, defaults to one day, which means you’ll have a superfluous job manager pod in your cluster for quite a while when suspending a job

The Helm values file with these configuration amendments looks like so:

defaultConfiguration:
  create: true
  append: true
  flink-conf.yaml: |+
    kubernetes.operator.default-configuration.flink-version.v1_20.env.java.default-opts.all: --add-exports=java.base/sun.net.util=ALL-UNNAMED --add-exports=java.rmi/sun.rmi.registry=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.api=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.parser=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.tree=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.util=ALL-UNNAMED --add-exports=java.security.jgss/sun.security.krb5=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.text=ALL-UNNAMED --add-opens=java.base/java.time=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.locks=ALL-UNNAMED

    kubernetes.operator.jm-deployment.shutdown-ttl: 10000

‍<div style="text-align: center">Listing 2: helm-values.yaml for customizing the operator configuration</div>

Install the Helm chart, using this values file:

$ helm install -f helm-values.yaml flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator

This should yield the following output:

NAME: flink-kubernetes-operator
LAST DEPLOYED: Mon Jan 20 17:02:43 2025
NAMESPACE: default
STATUS: deployed
REVISION: 1
TEST SUITE: None

<div class="side-note">The Helm chart creates two Kubernetes roles and associated service accounts: <span class="inline-code">flink-operator</span>, used by the Flink operator for managing Flink deployments, and <span class="inline-code">flink</span>, used by the Flink job manager for spawning task manager pods. By default, the former is cluster-scoped, while the latter is namespace-scoped. In the following, we are going to deploy Flink jobs in the default namespace; if you want to deploy jobs in other namespaces, refer to the operator’s RBAC documentation to learn how to create the required role and service account.</div>

Next, we are going to create a deployment of MinIO to have an S3 backend for persisting Flink checkpoints and savepoints as well as Flink high availability data, independent of the lifetime of specific Flink pods. The MinIO deployment is taken pretty much verbatim from the upstream Kubernetes examples, only that the volume for persisting the MinIO data is backed by the aforementioned Kind extra mount:

$ kubectl apply -f storage/

Create a bucket named <span class="inline-code">flink-data</span>:

$ kubectl -n default run minio-client --image=minio/mc:latest --restart=Never --command=true -- sh -c "mc config host add minio http://minio-service.default.svc.cluster.local:9000 minio minio123 && mc mb minio/flink-data"

This concludes the installation steps and we are ready to run our first Flink job on Kubernetes.

Deployment Types

Following the Kubernetes operator pattern, Flink jobs are deployed using custom Kubernetes resource definitions (CRDs). This means you define a job and its configuration in a resource descriptor (a YAML file), and the operator will take care of creating and managing all the required Kubernetes resources, such as pods for job and task managers, config maps for job configuration, ingresses for accessing the Flink web UI, etc. If you make changes to a job’s definition (or delete it), the operator will perform the steps required to keep the deployed resources in sync with the resource definition.

The Flink Kubernetes operator supports running Flink in both application and session mode, with dedicated resource definitions for each mode. In application mode, each job is executed in its own Flink cluster, i.e. with its own job manager. When the job is finished, that cluster will be shut down. The <span class="inline-code">FlinkDeployment</span> resource type is used for running jobs in application mode.

In contrast, Flink’s session mode utilizes a long-running Flink cluster onto which multiple jobs are deployed, using the <span class="inline-code">FlinkSessionJob</span> resource type. Session mode can be more resource-efficient due to the shared job manager, at the cost of reduced isolation between jobs.

In the following, we are going to use application mode, i.e., a dedicated Flink cluster per job.

Sticking to the recommendations from the operator documentation, we’ll deploy Flink using the Native Kubernetes resource provider, which means that Flink will directly talk to the Kubernetes API to orchestrate resources such as task manager pods. In particular when deploying jobs of unknown provenience, it can make sense to use the Standalone provider instead, in which case Flink jobs don’t have access to the Kubernetes API, but instead the operator is managing all the required resources.

Deploying Your First Flink Job on Kubernetes

The operator’s repository on GitHub contains a number of very useful example resource definitions. Slightly adjusted to run with Flink 1.20 on Java 17, the most basic <span class="inline-code">FlinkDeployment</span> definition for running one of the example jobs shipping with Flink looks like this:

apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: basic-example
spec:
  image: flink:1.20-java17
  flinkVersion: v1_20
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
  serviceAccount: flink
  jobManager:
    resource:
      memory: "2048m"
      cpu: 1
  taskManager:
    resource:
      memory: "2048m"
      cpu: 1
  job:
    jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
    parallelism: 2
    upgradeMode: stateless

‍<div style="text-align: center">Listing 3: Resource definition basic.yaml for a first Flink job</div>

This is running the StateMachineExample.jar job on Flink 1.20 with a parallelism of 2, assigning 1 CPU core and 2 GB of RAM to the job manager and task manager pods, respectively. No execution state will be retained between job runs (upgrade mode <span class="inline-code">stateless</span>). To deploy this job, apply the resource definition using <span class="inline-code">kubectl</span>:

$ kubectl apply -f flink/basic.yaml

As the operator exposes Flink deployments just like any other kind of Kubernetes resource, you can examine their status with kubectl, too (output adjusted for readability):

$ kubectl describe FlinkDeployment basic-example
Name:         basic-example
Namespace:    default
Labels:       <none>
Annotations:  <none>
API Version:  flink.apache.org/v1beta1
Kind:         FlinkDeployment
Metadata:
  Creation Timestamp:  2025-01-15T09:58:26Z
  Finalizers:
    flinkdeployments.flink.apache.org/finalizer
  Generation:  2
  Managed Fields:
    API Version:  flink.apache.org/v1beta1
    ...
  Resource Version:  16235
  UID:               470322f2-2658-48d0-bd86-7473816a5e40
Spec:
  Flink Configuration:
    taskmanager.numberOfTaskSlots:  2
  Flink Version:                    v1_20
  Image:                            flink:1.20
  Job:
    Args:
    Jar URI:       local:///opt/flink/examples/streaming/StateMachineExample.jar
    Parallelism:   4
    State:         running
    Upgrade Mode:  stateless
  Job Manager:
    Replicas:  1
    Resource:
      Cpu:          1
      Memory:       2048m
  Service Account:  flink
  Task Manager:
    Resource:
      Cpu:     1
      Memory:  2048m
Status:
  Cluster Info:
    Flink - Revision:             b1fe7b4 @ 2024-07-25T04:22:22+02:00
    Flink - Version:              1.20.0
    Total - Cpu:                  3.0
    Total - Memory:               6442450944
  Job Manager Deployment Status:  READY
  Job Status:
    Checkpoint Info:
      Last Periodic Checkpoint Timestamp:  0
    Job Id:                                0f646b467710dd8f39e856f4a892be1d
    Job Name:                              State machine job
    Savepoint Info:
      Last Periodic Savepoint Timestamp:  0
      Savepoint History:
    Start Time:         1736935168717
    State:              RUNNING
    Update Time:        1736935179288
  Lifecycle State:      STABLE
  Observed Generation:  2
  Reconciliation Status:
    Last Reconciled Spec:     {"spec":{...}} 
    Last Stable Spec:         {"spec":{...}} 
    Reconciliation Timestamp:  1736935106418
    State:                     DEPLOYED
  Task Manager:
    Label Selector:  component=taskmanager,app=basic-example
    Replicas:        2
Events:
  Type    Reason            Age    From                  Message
  ----    ------            ----   ----                  -------
  Normal  Submit            6m28s  JobManagerDeployment  Starting deployment
  Normal  JobStatusChanged  5m15s  Job                   Job status changed from RECONCILING to RUNNING

When deploying a job, the operator also creates a Kubernetes Service for exposing the Flink web UI. For testing purposes, you can forward its port, allowing you to access the UI on http://localhost:8081:

$ kubectl port-forward service/basic-example-rest 8081
Fig. 2: The Flink web UI

Next, let’s make some changes to the job definition. Currently, the job runs with a parallelism of 2. As there are also two slots per task manager, a single task manager pod is running (besides the job manager pod) for the job:

$ kubectl get pods -l app=basic-example
NAME                             READY   STATUS    RESTARTS   AGE
basic-example-5d9dd5d5c4-h42jd   1/1     Running   0          14s
basic-example-taskmanager-1-1    1/1     Running   0          6s

Patch the resource to change the parallelism to 4:

$ kubectl patch FlinkDeployment basic-example --patch '{"spec" : { "job" : { "parallelism" : 4 }}}' --type=merge

The operator will recognize the change and reconcile the deployment accordingly. Shortly thereafter, you’ll see another task pod running:

$ kubectl get pods -l app=basic-example
NAME                             READY   STATUS    RESTARTS   AGE
basic-example-6fcfd8464c-sm9vc   1/1     Running   0          77s
basic-example-taskmanager-1-1    1/1     Running   0          68s
basic-example-taskmanager-1-2    1/1     Running   0          68s

If you go back to the Flink web UI (recreate the port forwarding, if needed), you’ll also see that there are two task managers now, with four task slots overall:

Fig. 3: Updated job in the Flink web UI

Finally, to delete the job, run the following:

$ kubectl delete FlinkDeployment basic-example

Besides Java, you can also implement your streaming processing jobs using Flink’s Python API, PyFlink. Refer to this post for learning how to get started with running your first PyFlink jobs on Kubernetes.

Building Custom Job Images

So far, we’ve run an example Flink job which is contained within the upstream Flink container image. In practise, you want to run your own Flink job JARs of course. Multiple options exist for doing so:

  • Adding an init container to the Flink pods which retrieves the JAR at start-up from a remote location such as an S3 bucket, as shown in this blog post
  • Retrieving the JAR from an HTTP server—for instance an internal Maven repository—by specifying an HTTP URI for the <span class="inline-code">jarURI</span> parameter; when using Flink 1.19 or later, the parameter also accepts S3 URIs, provided an S3 file system plug-in has been configured (see FLINK-28915)
  • Building a custom container image, extending the upstream Flink image and adding your job JAR file at build time

While all these approaches do the trick, my personal preference and recommendation is the last one, building a custom image with your job, rather than fetching the JAR at runtime. This can help to speed up things when restarting a job (as the image may be retrieved from a node-local image cache) and generally is more in line with the philosophy of immutable container images; for instance, when inspecting the image run by a pod, it’s immediately clear which version of the Flink job this is.

The example project of this post contains a small Flink job, which prints out a sequence of numbers, using Flink’s built-in DataGen source. You can build a container image for this job via Docker like so:

$ docker build -t decodable-examples/hello-world-job:1.0 hello-world-job

While you’d normally push this image to an image registry such as ECR, we’re simply going to load it directly into Kind in this case:

$ kind load docker-image decodable-examples/hello-world-job:1.0 --name my-cluster

Next, we need to create a <span class="inline-code">FlinkDeployment</span> resource for this job. This is very similar to the one before, only that now we’re referring to our custom image containing the Flink job JAR file:

apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: custom-job
spec:
  image: decodable-examples/flink-hello-world:1.0
  flinkVersion: v1_20
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
  serviceAccount: flink
  jobManager:
    resource:
      memory: "2048m"
      cpu: 1
  taskManager:
    resource:
      memory: "2048m"
      cpu: 1
  job:
    jarURI: local:///opt/flink/examples/streaming/flink-hello-world-0.1.jar
    parallelism: 1
    upgradeMode: stateless

‍<div style="text-align: center">Listing 4: Resource definition custom-job.yaml for a custom Flink job</div>

Apply the descriptor:

$ kubectl apply -f flink/custom-job.yaml

Then verify the job status is running:

$ kubectl get FlinkDeployment custom-job
NAME         JOB STATUS   LIFECYCLE STATE
custom-job   RUNNING      STABLE

For further analysis, you can also take a look at the logs of the task manager pod like so:

$ kubectl logs -l app=custom-job,component=taskmanager -f
...
2025-01-20 11:07:40,323 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Source: Generator Source -> Sink: Writer (1/1)#0 (d70586a2890af7b3459578ad30dcd550_cbc357ccb763df2852fee8c4fc7d55f2_0_0) switched from DEPLOYING to INITIALIZING.
2025-01-20 11:07:40,365 INFO  org.apache.flink.runtime.state.StateBackendLoader            [] - State backend is set to heap memory org.apache.flink.runtime.state.hashmap.HashMapStateBackend@79439cec
2025-01-20 11:07:40,375 INFO  org.apache.flink.runtime.state.StateBackendLoader            [] - State backend is set to heap memory org.apache.flink.runtime.state.hashmap.HashMapStateBackend@177287da
2025-01-20 11:07:40,380 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Source: Generator Source -> Sink: Writer (1/1)#0 (d70586a2890af7b3459578ad30dcd550_cbc357ccb763df2852fee8c4fc7d55f2_0_0) switched from INITIALIZING to RUNNING.
Number: 0
Number: 1
Number: 2
...

This concludes part one of this blog post series. Head over to part two to learn about fault tolerance and high availability of your Flink jobs running on Kubernetes, savepoint management, observability, and more.

📫 Email signup 👇

Did you enjoy this issue of Checkpoint Chronicle? Would you like the next edition delivered directly to your email to read from the comfort of your own home?

Simply enter your email address here and we'll send you the next issue as soon as it's published—and nothing else, we promise!

Gunnar Morling

Gunnar is an open-source enthusiast at heart, currently working on Apache Flink-based stream processing. In his prior role as a software engineer at Red Hat, he led the Debezium project, a distributed platform for change data capture. He is a Java Champion and has founded multiple open source projects such as JfrUnit, kcctl, and MapStruct.