Kubernetes is a widely used deployment platform for Apache Flink. While Flink has had native support for Kubernetes for quite a while, it is in particular the operator pattern which makes deploying Flink jobs onto Kubernetes clusters a compelling option: you define jobs in a declarative resource, and a control loop running in a component called a Kubernetes operator takes care of provisioning and maintaining (e.g. scaling, updating) all the required resources. Automation is the keyword here, significantly reducing the manual effort required for running Flink jobs in production.
There are multiple Kubernetes operators for Flink, including Lyft’s flinkk8soperator, flink-on-k8s-operator by Spotify (a fork of a now unmaintained operator initially developed by Google), and flink-controller by Andrea Medeghini. The most widely used operator these days though is the upstream Flink Kubernetes Operator, developed under the umbrella of the Apache Flink project itself, which is also the focus of this two-part blog post series. It gives you an overview on what it takes to install the operator and run your first jobs on Kubernetes, and discuss what’s missing to create a complete Flink-based data platform for an organization.
In part one, we’ll touch on installation and setup, deploying Flink jobs via custom resources and creating container images for your own Flink jobs. Part two covers fault tolerance and high availability, savepoint management, observability, and then UI access.
The idea for these posts is to give you a fully runnable example so you can try out everything yourself. To follow along, grab the complete source code in the Decodable examples repository on GitHub:
You’ll need the following prerequisites:
- Docker, for running and building container images
- kind, for setting up a local Kubernetes cluster, and kubectl for interacting with it
- Helm, for installing the Kubernetes operator
Fig. 1 gives an overview of the solution we are going to build in this blog series, with all the key resources and most relevant interactions:
Installation and Setup
Start by creating a local Kubernetes cluster using <span class="inline-code">kind</span>:
The cluster contains a control plane as well as a worker node. The kind configuration makes the directory /tmp/kind-worker available to the worker node of the cluster, which will allow you to keep Flink savepoints when rebuilding the kind cluster. It also maps ports 80 and 443 of the worker node to the host, which will allow you to access the Flink web UI via a Kubernetes ingress later on:
‍<div style="text-align: center">Listing 1: kind-cluster.yml for spinning up a local Kubernetes cluster</div>
Verify the cluster is running:
Installing the Flink Kubernetes operator is straight-forward thanks to the provided Helm chart. Add the Helm repository for the current version 1.10 (released in October 2024) of the operator like so:
Before installing the operator itself, it is required to install the Kubernetes cert-manager (needed for issuing TLS certificates used by the operator’s admission controller):
The operator Helm chart can be customized upon installation with a wide set of parameters, for instance to control which Kubernetes namespaces the operator should observe, which images should be used by default for deploying Flink jobs, etc. We’re gonna make the following adjustments:
- Inject the configuration required for running Flink jobs on Java 17; while Flink itself provides support for Java 17 since version 1.18, the operator in version 1.10 doesn’t pass the <span class="inline-code">--add-opens</span> flags required to run Flink on Java 17 and newer. This has been added recently (see FLINK-36646), but no release of the operator with those changes is available as of now
- Shorten the lifetime of Flink job managers after a job has been suspended; this setting, <span class="inline-code">kubernetes.operator.jm-deployment.shutdown-ttl</span>, defaults to one day, which means you’ll have a superfluous job manager pod in your cluster for quite a while when suspending a job
The Helm values file with these configuration amendments looks like so:
‍<div style="text-align: center">Listing 2: helm-values.yaml for customizing the operator configuration</div>
Install the Helm chart, using this values file:
This should yield the following output:
<div class="side-note">The Helm chart creates two Kubernetes roles and associated service accounts: <span class="inline-code">flink-operator</span>, used by the Flink operator for managing Flink deployments, and <span class="inline-code">flink</span>, used by the Flink job manager for spawning task manager pods. By default, the former is cluster-scoped, while the latter is namespace-scoped. In the following, we are going to deploy Flink jobs in the default namespace; if you want to deploy jobs in other namespaces, refer to the operator’s RBAC documentation to learn how to create the required role and service account.</div>
Next, we are going to create a deployment of MinIO to have an S3 backend for persisting Flink checkpoints and savepoints as well as Flink high availability data, independent of the lifetime of specific Flink pods. The MinIO deployment is taken pretty much verbatim from the upstream Kubernetes examples, only that the volume for persisting the MinIO data is backed by the aforementioned Kind extra mount:
Create a bucket named <span class="inline-code">flink-data</span>:
This concludes the installation steps and we are ready to run our first Flink job on Kubernetes.
Deployment Types
Following the Kubernetes operator pattern, Flink jobs are deployed using custom Kubernetes resource definitions (CRDs). This means you define a job and its configuration in a resource descriptor (a YAML file), and the operator will take care of creating and managing all the required Kubernetes resources, such as pods for job and task managers, config maps for job configuration, ingresses for accessing the Flink web UI, etc. If you make changes to a job’s definition (or delete it), the operator will perform the steps required to keep the deployed resources in sync with the resource definition.
The Flink Kubernetes operator supports running Flink in both application and session mode, with dedicated resource definitions for each mode. In application mode, each job is executed in its own Flink cluster, i.e. with its own job manager. When the job is finished, that cluster will be shut down. The <span class="inline-code">FlinkDeployment</span> resource type is used for running jobs in application mode.
In contrast, Flink’s session mode utilizes a long-running Flink cluster onto which multiple jobs are deployed, using the <span class="inline-code">FlinkSessionJob</span> resource type. Session mode can be more resource-efficient due to the shared job manager, at the cost of reduced isolation between jobs.
In the following, we are going to use application mode, i.e., a dedicated Flink cluster per job.
Sticking to the recommendations from the operator documentation, we’ll deploy Flink using the Native Kubernetes resource provider, which means that Flink will directly talk to the Kubernetes API to orchestrate resources such as task manager pods. In particular when deploying jobs of unknown provenience, it can make sense to use the Standalone provider instead, in which case Flink jobs don’t have access to the Kubernetes API, but instead the operator is managing all the required resources.
Deploying Your First Flink Job on Kubernetes
The operator’s repository on GitHub contains a number of very useful example resource definitions. Slightly adjusted to run with Flink 1.20 on Java 17, the most basic <span class="inline-code">FlinkDeployment</span> definition for running one of the example jobs shipping with Flink looks like this:
‍<div style="text-align: center">Listing 3: Resource definition basic.yaml for a first Flink job</div>
This is running the StateMachineExample.jar job on Flink 1.20 with a parallelism of 2, assigning 1 CPU core and 2 GB of RAM to the job manager and task manager pods, respectively. No execution state will be retained between job runs (upgrade mode <span class="inline-code">stateless</span>). To deploy this job, apply the resource definition using <span class="inline-code">kubectl</span>:
As the operator exposes Flink deployments just like any other kind of Kubernetes resource, you can examine their status with kubectl, too (output adjusted for readability):
When deploying a job, the operator also creates a Kubernetes Service for exposing the Flink web UI. For testing purposes, you can forward its port, allowing you to access the UI on http://localhost:8081:
Next, let’s make some changes to the job definition. Currently, the job runs with a parallelism of 2. As there are also two slots per task manager, a single task manager pod is running (besides the job manager pod) for the job:
Patch the resource to change the parallelism to 4:
The operator will recognize the change and reconcile the deployment accordingly. Shortly thereafter, you’ll see another task pod running:
If you go back to the Flink web UI (recreate the port forwarding, if needed), you’ll also see that there are two task managers now, with four task slots overall:
Finally, to delete the job, run the following:
Besides Java, you can also implement your streaming processing jobs using Flink’s Python API, PyFlink. Refer to this post for learning how to get started with running your first PyFlink jobs on Kubernetes.
Building Custom Job Images
So far, we’ve run an example Flink job which is contained within the upstream Flink container image. In practise, you want to run your own Flink job JARs of course. Multiple options exist for doing so:
- Adding an init container to the Flink pods which retrieves the JAR at start-up from a remote location such as an S3 bucket, as shown in this blog post
- Retrieving the JAR from an HTTP server—for instance an internal Maven repository—by specifying an HTTP URI for the <span class="inline-code">jarURI</span> parameter; when using Flink 1.19 or later, the parameter also accepts S3 URIs, provided an S3 file system plug-in has been configured (see FLINK-28915)
- Building a custom container image, extending the upstream Flink image and adding your job JAR file at build time
While all these approaches do the trick, my personal preference and recommendation is the last one, building a custom image with your job, rather than fetching the JAR at runtime. This can help to speed up things when restarting a job (as the image may be retrieved from a node-local image cache) and generally is more in line with the philosophy of immutable container images; for instance, when inspecting the image run by a pod, it’s immediately clear which version of the Flink job this is.
The example project of this post contains a small Flink job, which prints out a sequence of numbers, using Flink’s built-in DataGen source. You can build a container image for this job via Docker like so:
While you’d normally push this image to an image registry such as ECR, we’re simply going to load it directly into Kind in this case:
Next, we need to create a <span class="inline-code">FlinkDeployment</span> resource for this job. This is very similar to the one before, only that now we’re referring to our custom image containing the Flink job JAR file:
‍<div style="text-align: center">Listing 4: Resource definition custom-job.yaml for a custom Flink job</div>
Apply the descriptor:
Then verify the job status is running:
For further analysis, you can also take a look at the logs of the task manager pod like so:
This concludes part one of this blog post series. Head over to part two to learn about fault tolerance and high availability of your Flink jobs running on Kubernetes, savepoint management, observability, and more.