Back
January 28, 2025
12
min read

Get Running with Apache Flink on Kubernetes, part 2 of 2

By
Gunnar Morling
Share this post

Welcome back to this two-part blog post series about running Apache Flink on Kubernetes, using the Flink Kubernetes operator. In part one, we discussed installation and setup of the operator, different deployment types, how to deploy Flink jobs using custom Kubernetes resources, and how to create container images for your own Flink jobs. In this part, we’ll focus on aspects such as fault tolerance and high availability of your Flink jobs running on Kubernetes, savepoint management, observability, and more. You can find the complete source code for all the examples shown in this series in the Decodable examples repository on GitHub: on GitHub.

Fault Tolerance and High Availability

So far, we don’t have any means of fault tolerance or high availability (HA) in place for our job. If for instance a task or job manager pod crashes, all progress the job has made will be lost. Similarly, its state is not retained across restarts. If you change the job’s configuration or suspend and restart it, it will start from the beginning. To ensure that no state is lost in case of failures or (intentional) restarts, the following is required:

<div class="side-note">It also is possible to ensure HA for the Flink Kubernetes operator itself. To do so, enable leader election for the operator, as described in the documentation. You then can run multiple replicas of the operator, with one of them being the leader and others in a stand-by capacity, ready to take over should the current leader fail.<br/><br/>Whether this is necessary or not, depends on the requirements of the specific use case. Even without operator HA, existing job deployments are not affected by an operator failure and will continue to run. It thus may be acceptable to have short downtimes of the operator, provided the right monitoring is in place to detect the situation swiftly and resolve it manually.</div>

As Kubernetes pods are ephemeral, the corresponding state needs to be persisted externally. Oftentimes, object storage such as an S3 bucket is used for this purpose, thus avoiding the need to mount any persistent volumes to the Flink pods. Based on the upstream HA example resource, the following shows how to configure checkpoints, savepoints, and job manager HA using the bucket created in MinIO we set up before:

apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: custom-job-ha
spec:
  image: decodable-examples/flink-hello-world:1.0
  flinkVersion: v1_20
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"

    s3.access.key: minio
    s3.secret.key: minio123
    s3.endpoint: http://minio-service.default.svc.cluster.local:9000
    s3.path.style.access: "true"

    state.backend: rocksdb
    state.backend.incremental: "true"
    state.checkpoints.dir: s3://flink-data/checkpoints
    state.savepoints.dir: s3://flink-data/savepoints

    high-availability.type: kubernetes
    high-availability.storageDir: s3://flink-data/ha
  serviceAccount: flink
  jobManager:
    resource:
      memory: "2048m"
      cpu: 1
  taskManager:
    resource:
      memory: "2048m"
      cpu: 1
  podTemplate:
    spec:
      containers:
        - name: flink-main-container
          env:
            - name: ENABLE_BUILT_IN_PLUGINS
              value: "flink-s3-fs-presto-1.20.0.jar"
  job:
    jarURI: local:///opt/flink/examples/streaming/flink-hello-world-1.0.jar
    parallelism: 2
    upgradeMode: savepoint
    state: running

‍<div style="text-align: center">Listing 1: Resource definition custom-job-ha.yaml for a custom Flink job with fault tolerance and high availability</div>

Deploy the job by applying the resource:

$ kubectl apply -f flink/custom-job-ha.yaml

Quite a few things are going on here, so let’s digest them one by one. First, the directories for storing checkpoints and savepoints are configured using the <span class="inline-code">state.checkpoints.dir</span> and <span class="inline-code">state.savepoints.dir</span> options, respectively. We are using RocksDB as a Flink state backend, enabling incremental checkpointing, thus avoiding the need to transfer the complete state upon each checkpoint, which can substantially reduce checkpoint durations. While not that relevant for a small-scale example like the one at hand, it can make sense to make use of S3 entropy injection: by adding a random part at the beginning of checkpoint paths, the data will be distributed across multiple shards of the S3 bucket.

To enable Flink’s job manager HA services, <span class="inline-code">high-availability.type</span> must be set to <span class="inline-code">kubernetes</span> and a directory for storing job manager metadata must be given via <span class="inline-code">high-availability.storageDir</span>. In order for Flink to access data on S3, an S3 file system plug-in is enabled via the <span class="inline-code">ENABLE_BUILT_IN_PLUGINS</span> environment variable set for the Flink containers in the pod template of the resource descriptor. Flink provides two plug-ins for S3; we are using the Presto one as this is the recommended option for storing checkpoint data.

For accessing the S3 API provided by MinIO in this example, we are providing the access key and secret key right within the resource definition itself. In a production use case, storing data in an S3 bucket on AWS, this should be avoided, instead authentication should happen via IAM, granting access to S3 via an IAM role for the Flink pods.

Lastly, by setting <span class="inline-code">job.upgradeMode</span> to <span class="inline-code">savepoint</span>, the operator will trigger a savepoint when suspending a job and start from that savepoint when resuming. To verify that this actually works, make a change to the resource descriptor (for instance to change its parallelism to 2) and apply it again. If you examine the logs of the task manager pod subsequently, you should notice that the emitted sequence numbers don’t start at 1 but at the point where the job left off before.

You can also suspend a job explicitly, if for instance your use case doesn’t require a real-time processing of data and you want to save on compute resources. To do so, patch the job’s target state to suspended:

$ kubectl patch FlinkDeployment custom-job-ha --patch '{"spec" : { "job" : { "state" : "suspended" }}}' --type=merge

Once the operator picks up this change, it will shut down the job, removing the job manager and task manager pods. The actual <span class="inline-code">FlinkDeployment</span> resource remains present, though, now indicating that it is suspended:

$ kubectl get FlinkDeployment custom-job-ha
NAME            JOB STATUS   LIFECYCLE STATE
custom-job-ha   FINISHED     SUSPENDED

To resume the job, patch the target state back to <span class="inline-code">running</span>, after which you should be able to observe that the job continues from the latest savepoint. The cool thing is that savepoints created that way are represented by Kubernetes resources, too. The operator provides a dedicated resource type for this, <span class="inline-code">FlinkStateSnapshot</span>, which you can query to obtain the list of savepoints:

$ kubectl get FlinkStateSnapshots
NAME                                            PATH                                                       RESULT TIMESTAMP              SNAPSHOT STATE
custom-job-ha-savepoint-upgrade-1736963811478   s3://flink-data/savepoints/savepoint-52e52e-58013f2a2604   2025-01-15T17:56:51.536782Z   COMPLETED

Manually Triggering Savepoints

It is also possible to manually create savepoints and use them when resuming a suspended job, or when creating a new instance of that job. To do so, create and apply a <span class="inline-code">FlinkStateSnapshot</span> resource, referencing the job to snapshot:

apiVersion: flink.apache.org/v1beta1
kind: FlinkStateSnapshot
metadata:
  name: example-savepoint
spec:
  backoffLimit: 1
  jobReference:
    kind: FlinkDeployment
    name: custom-job-ha
  savepoint: {}

‍<div style="text-align: center">Listing 2: Resource definition savepoint.yaml for manually triggering a savepoint</div>

$ kubectl apply -f flink/savepoint.yaml

In order to start a job from this savepoint, retrieve its path on S3 via <span class="inline-code">kubectl describe FlinkStateSnapshot example-savepoint</span> and specify it under <span class="inline-code">spec.job.initialSavepointPath</span> in the <span class="inline-code">FlinkDeployment</span> resource of the job:

...
spec:
  job:
    jarURI: local:///opt/flink-jobs/flink-hello-world-1.0.jar
    upgradeMode: savepoint
    state: running
    initialSavepointPath: s3://flink-data/savepoints/savepoint-eba39a-1fd698a34a01
...

Note that if you’d like to restart an existing job from a specific savepoint, you also need to specify <span class="inline-code">savepointRedeployNonce</span> in addition. Set it to 1 when it hasn’t been set before, otherwise increment its value for each restart:

...
initialSavepointPath: s3://flink-data/savepoints/savepoint-eba39a-1fd698a34a01
savepointRedeployNonce: 1
...

Observability

Once you are moving your Flink jobs to production, it is critical to have good observability for all the components in place, allowing you to examine the system’s state and react to failures, degraded performance, etc. Typically, an application’s logs, metrics, and traces are imported into services and tools such as Datadog, New Relic, or OpenSearch for this purpose.

It’s beyond the scope of this post to provide a comprehensive description of an observability solution for Flink deployments. To give you a starting point though, the accompanying repository contains a basic example setup which shows how to ingest logs from job and task manager pods into Elasticsearch, allowing you to analyze them in a Kibana dashboard.

Install the Elasticsearch Kubernetes operator, then set up Elasticsearch and Kibana by running the following:

$ kubectl create -f https://download.elastic.co/downloads/eck/2.16.0/crds.yaml
$ kubectl apply -f https://download.elastic.co/downloads/eck/2.16.0/operator.yaml
$ kubectl -n logging apply -f logging/elastic.yaml
$ kubectl -n logging apply -f logging/kibana.yaml

The Kubernetes Logging operator is used for collecting and forwarding log files. You can deploy it with the following Helm command:

$ helm upgrade --install --wait --create-namespace --namespace logging logging-operator oci://ghcr.io/kube-logging/helm-charts/logging-operator

Next, configure a <span class="inline-code">Logging</span> instance:

$ kubectl -n logging apply -f logging/logging.yaml

This installs as an agent (<span class="inline-code">fluentbit</span>) on each worker node of the Kubernetes cluster, which collects the logs of the pods on this node—enriching them with Kubernetes metadata such as pod labels—and forwards them to a collector service (<span class="inline-code">fluentd</span>). There they are filtered and transformed as per the current configuration and finally forwarded to Elasticsearch. By using log4j2’s JsonTemplateLayout for Flink log messages (configured here), the same are emitted in a structured form, making it very easy to process them:

...
{"@timestamp":"2025-01-17T09:51:55.295Z","ecs.version":"1.2.0","log.level":"INFO","message":"Completed checkpoint 360 for job 2bd169b220ae0d7e26f3edf0869c244f (15960 bytes, checkpointDuration=9 ms, finalizationTime=0 ms).","process.thread.name":"jobmanager-io-thread-1","log.logger":"org.apache.flink.runtime.checkpoint.CheckpointCoordinator","flink-job-id":"2bd169b220ae0d7e26f3edf0869c244f"}
...

The ingestion pipeline into Elasticsearch is set up via two resources processed by the Kubernetes Logging operator. The first is a <span class="inline-code">ClusterOutput</span> resource, describing the destination of the pipeline, i.e. the Elasticsearch endpoint, the credentials, etc.:

apiVersion: logging.banzaicloud.io/v1beta1
kind: ClusterOutput
metadata:
  name: es-output
spec:
  elasticsearch:
    host: quickstart-es-http.logging.svc.cluster.local
    port: 9200
    scheme: https
    ssl_verify: false
    ssl_version: TLSv1_2
    user: elastic
    password:
      valueFrom:
        secretKeyRef:
          name: quickstart-es-elastic-user
          key: elastic
    buffer:
      timekey: 1m
      timekey_wait: 30s
      timekey_use_utc: true

‍<div style="text-align: center">Listing 3: Resource definition elastic-output.yaml, describing the output of the logging pipeline</div>

Create the output:

$ kubectl -n logging apply -f logging/elastic-output.yaml

The second resource is of type <span class="inline-code">Flow</span> and describes the set of pods to ingest from (leveraging the component labels set up by the Flink Kubernetes operator), as well as the parsing logic and destination of log pipeline:

apiVersion: logging.banzaicloud.io/v1beta1
kind: Flow
metadata:
  name: es-flow
spec:
  filters:
    - tag_normaliser: {}
    - parser:
        remove_key_name_field: true
        reserve_data: true
        parse:
          type: json
          time_key: "@timestamp"
  match:
    - select:
        labels:
          component: jobmanager
    - select:
        labels:
          component: taskmanager
  globalOutputRefs:
    - es-output

‍<div style="text-align: center">Listing 4: Resource definition elastic-flow.yaml, describing the logic of the logging pipeline</div>

Create the flow:

$ kubectl apply -f logging/elastic-flow.yaml

Once these resources are in place, the Kubernetes Logging operator will set up the required infrastructure for extracting and propagating the Flink logs, utilizing fluentbit and fluentd. The logs will be ingested into an Elasticsearch, which you can inspect and query using Kibana. To access Kibana, retrieve the password of the <span class="inline-code">elastic</span> user like so:

$ kubectl -n logging get secret quickstart-es-elastic-user -o=jsonpath='{.data.elastic}' | base64 --decode; echo
9V9520L2yvgzJla0RIyQo784

Then forward port 5601 of the “quickstart-kb-http” service:

$ kubectl -n logging port-forward service/quickstart-kb-http 5601

Navigate to https://localhost:5601/ (accept the warning because of the unknown certificate issue) and log in, then go to “Analytics” → “Discover” and create a data view for the index named <span class="inline-code">fluentd</span>:

Fig. 1: Flink job logs ingested into Elasticsearch

As mentioned before, this is just scratching the surface when it comes to setting up observability infrastructure for Flink. There’s a wide range of tools and platforms in that space, and you should choose what matches your requirements, integrating with the solutions and infrastructure already in use with your organization. Besides logging, your observability strategy will typically also cover metrics (which are supported by Flink via a range of metrics exporters, for instance for Prometheus, InfluxDB, and DataDog) and traces (e.g. supported via the OpenTelemetry trace exporter). But also Java-specific tools such as JDK Flight Recorder can be invaluable to gain insight into the performance characteristics and other runtime behaviors of your Flink jobs.

Bonus: Managing Flink Jobs With the Heimdall UI

For working with the Flink web UI in a production deployment, the operator optionally sets up a Kubernetes Ingress resource for each job, making its UI available for external access. To do so, simply add the following section to the job’s resource definition:

ingress:
  template: "localhost/{{name}}(/|$)(.*)"
  className: "nginx"
  annotations:
    nginx.ingress.kubernetes.io/rewrite-target: "/$2"

You’ll also need an ingress controller, which you can install for the local kind cluster like so:

$ helm upgrade --install ingress-nginx ingress-nginx \
  --repo https://kubernetes.github.io/ingress-nginx \
  --namespace ingress-nginx --create-namespace --set controller.nodeSelector."kubernetes\.io/hostname"=my-cluster-worker

$ kubectl apply -f heimdall/deploy-ingress-nginx.yaml

Once the controller is running, you can then access the Flink web UI from your local host, without requiring any port forwardings, for instance at http://localhost/custom-job-ha/ for the <span class="inline-code">custom-job-ha</span> job.

With a large number of jobs it can become challenging to keep track of all the deployed jobs and where to find their UIs, logs, or metrics. This is where Heimdall comes into the picture, a project by Yaroslav Tkachenko: it provides one unified UI for accessing all the Flink jobs executed via the operator on one Kubernetes cluster. You can install it into the cluster by running this:

$ kubectl apply -f heimdall/heimdall.yaml

Once running, the UI provides a list of all the deployed jobs, their status, resource, etc. Forward its port to access it from your host:

$ kubectl port-forward service/heimdall 8080
Fig. 2: Heimdall, a UI for managing multiple Flink jobs on one Kubernetes cluster

The endpoint URLs in the Heimdall UI are customizable, so that you can easily link to the right locations of Flink UI, API, metrics, and logs of your specific environment.

Summary and Discussion

The Apache Flink Kubernetes Operator is becoming increasingly popular for running Flink jobs on Kubernetes. Following the well-established Kubernetes operator pattern, it allows you to deploy and run your stream processing jobs by creating declarative resources which are processed in a reconciliation loop by the operator. This means you get to focus on the “What?” of your job, with a lot of the “How?” being taken care of automatically for you. Rather than having to deal with details such as configuring deployments, services, and ingresses, the operator lets you express your intent in a higher-level resource and derives all the required lower-level resources from that.

There’s quite a few features of the operator which we couldn’t discuss in this post, most prominently its capabilities for auto-scaling (i.e. scaling jobs automatically up and down in order to minimize back pressure while also satisfying given utilization targets) and auto-tuning Flink jobs (automatically adjusting the memory consumed by a job). Both are very promising, warranting their own article.

<div class="side-note">The Flink Kubernetes operator is a versatile solution for deploying and managing Flink jobs. When it comes to building a full-fledged Flink-based stream processing platform for an organization, there’s a non-trivial amount of additional aspects to keep in mind, including the following:<ul><li>The management and evolution of data schemas</li><li>Developer experience, for instance the ability to deploy SQL jobs and preview their results</li><li>Higher-level resources such as source and sink connectors or declarative end-to-end data pipelines</li><li>Cost attribution and quota management between the teams and departments of a larger organization</li><li>Security and Compliance with regulatory requirements such as HIPPA, GDPR, SOC 2</li><li>Operator upgrades as well as upgrades of (stateful) Flink jobs</li></ul>If all this sounds like it’s too much work, a fully-managed platform like Decodable might be interesting to you. Sign up for a free trial today and give it a try.</div>

Another interesting feature of the operator is the ability to customize its behavior via plug-ins. This can be useful if for instance you want to make sure all deployed jobs adhere to specific standards (by implementing a custom resource validator) or you want to amend their configuration upon deployment, e.g. to add specific pod labels automatically (by implementing a resource mutator).

The Flink Kubernetes operator is under active development and definitely a project worth keeping an eye on. For upcoming versions, the team has planned to work on a better “rollback mechanism and stability conditions” as well several improvements to the autoscaler.

📫 Email signup 👇

Did you enjoy this issue of Checkpoint Chronicle? Would you like the next edition delivered directly to your email to read from the comfort of your own home?

Simply enter your email address here and we'll send you the next issue as soon as it's published—and nothing else, we promise!

đź‘Ť Got it!
Oops! Something went wrong while submitting the form.
Gunnar Morling

Gunnar is an open-source enthusiast at heart, currently working on Apache Flink-based stream processing. In his prior role as a software engineer at Red Hat, he led the Debezium project, a distributed platform for change data capture. He is a Java Champion and has founded multiple open source projects such as JfrUnit, kcctl, and MapStruct.

Welcome back to this two-part blog post series about running Apache Flink on Kubernetes, using the Flink Kubernetes operator. In part one, we discussed installation and setup of the operator, different deployment types, how to deploy Flink jobs using custom Kubernetes resources, and how to create container images for your own Flink jobs. In this part, we’ll focus on aspects such as fault tolerance and high availability of your Flink jobs running on Kubernetes, savepoint management, observability, and more. You can find the complete source code for all the examples shown in this series in the Decodable examples repository on GitHub: on GitHub.

Fault Tolerance and High Availability

So far, we don’t have any means of fault tolerance or high availability (HA) in place for our job. If for instance a task or job manager pod crashes, all progress the job has made will be lost. Similarly, its state is not retained across restarts. If you change the job’s configuration or suspend and restart it, it will start from the beginning. To ensure that no state is lost in case of failures or (intentional) restarts, the following is required:

<div class="side-note">It also is possible to ensure HA for the Flink Kubernetes operator itself. To do so, enable leader election for the operator, as described in the documentation. You then can run multiple replicas of the operator, with one of them being the leader and others in a stand-by capacity, ready to take over should the current leader fail.<br/><br/>Whether this is necessary or not, depends on the requirements of the specific use case. Even without operator HA, existing job deployments are not affected by an operator failure and will continue to run. It thus may be acceptable to have short downtimes of the operator, provided the right monitoring is in place to detect the situation swiftly and resolve it manually.</div>

As Kubernetes pods are ephemeral, the corresponding state needs to be persisted externally. Oftentimes, object storage such as an S3 bucket is used for this purpose, thus avoiding the need to mount any persistent volumes to the Flink pods. Based on the upstream HA example resource, the following shows how to configure checkpoints, savepoints, and job manager HA using the bucket created in MinIO we set up before:

apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: custom-job-ha
spec:
  image: decodable-examples/flink-hello-world:1.0
  flinkVersion: v1_20
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"

    s3.access.key: minio
    s3.secret.key: minio123
    s3.endpoint: http://minio-service.default.svc.cluster.local:9000
    s3.path.style.access: "true"

    state.backend: rocksdb
    state.backend.incremental: "true"
    state.checkpoints.dir: s3://flink-data/checkpoints
    state.savepoints.dir: s3://flink-data/savepoints

    high-availability.type: kubernetes
    high-availability.storageDir: s3://flink-data/ha
  serviceAccount: flink
  jobManager:
    resource:
      memory: "2048m"
      cpu: 1
  taskManager:
    resource:
      memory: "2048m"
      cpu: 1
  podTemplate:
    spec:
      containers:
        - name: flink-main-container
          env:
            - name: ENABLE_BUILT_IN_PLUGINS
              value: "flink-s3-fs-presto-1.20.0.jar"
  job:
    jarURI: local:///opt/flink/examples/streaming/flink-hello-world-1.0.jar
    parallelism: 2
    upgradeMode: savepoint
    state: running

‍<div style="text-align: center">Listing 1: Resource definition custom-job-ha.yaml for a custom Flink job with fault tolerance and high availability</div>

Deploy the job by applying the resource:

$ kubectl apply -f flink/custom-job-ha.yaml

Quite a few things are going on here, so let’s digest them one by one. First, the directories for storing checkpoints and savepoints are configured using the <span class="inline-code">state.checkpoints.dir</span> and <span class="inline-code">state.savepoints.dir</span> options, respectively. We are using RocksDB as a Flink state backend, enabling incremental checkpointing, thus avoiding the need to transfer the complete state upon each checkpoint, which can substantially reduce checkpoint durations. While not that relevant for a small-scale example like the one at hand, it can make sense to make use of S3 entropy injection: by adding a random part at the beginning of checkpoint paths, the data will be distributed across multiple shards of the S3 bucket.

To enable Flink’s job manager HA services, <span class="inline-code">high-availability.type</span> must be set to <span class="inline-code">kubernetes</span> and a directory for storing job manager metadata must be given via <span class="inline-code">high-availability.storageDir</span>. In order for Flink to access data on S3, an S3 file system plug-in is enabled via the <span class="inline-code">ENABLE_BUILT_IN_PLUGINS</span> environment variable set for the Flink containers in the pod template of the resource descriptor. Flink provides two plug-ins for S3; we are using the Presto one as this is the recommended option for storing checkpoint data.

For accessing the S3 API provided by MinIO in this example, we are providing the access key and secret key right within the resource definition itself. In a production use case, storing data in an S3 bucket on AWS, this should be avoided, instead authentication should happen via IAM, granting access to S3 via an IAM role for the Flink pods.

Lastly, by setting <span class="inline-code">job.upgradeMode</span> to <span class="inline-code">savepoint</span>, the operator will trigger a savepoint when suspending a job and start from that savepoint when resuming. To verify that this actually works, make a change to the resource descriptor (for instance to change its parallelism to 2) and apply it again. If you examine the logs of the task manager pod subsequently, you should notice that the emitted sequence numbers don’t start at 1 but at the point where the job left off before.

You can also suspend a job explicitly, if for instance your use case doesn’t require a real-time processing of data and you want to save on compute resources. To do so, patch the job’s target state to suspended:

$ kubectl patch FlinkDeployment custom-job-ha --patch '{"spec" : { "job" : { "state" : "suspended" }}}' --type=merge

Once the operator picks up this change, it will shut down the job, removing the job manager and task manager pods. The actual <span class="inline-code">FlinkDeployment</span> resource remains present, though, now indicating that it is suspended:

$ kubectl get FlinkDeployment custom-job-ha
NAME            JOB STATUS   LIFECYCLE STATE
custom-job-ha   FINISHED     SUSPENDED

To resume the job, patch the target state back to <span class="inline-code">running</span>, after which you should be able to observe that the job continues from the latest savepoint. The cool thing is that savepoints created that way are represented by Kubernetes resources, too. The operator provides a dedicated resource type for this, <span class="inline-code">FlinkStateSnapshot</span>, which you can query to obtain the list of savepoints:

$ kubectl get FlinkStateSnapshots
NAME                                            PATH                                                       RESULT TIMESTAMP              SNAPSHOT STATE
custom-job-ha-savepoint-upgrade-1736963811478   s3://flink-data/savepoints/savepoint-52e52e-58013f2a2604   2025-01-15T17:56:51.536782Z   COMPLETED

Manually Triggering Savepoints

It is also possible to manually create savepoints and use them when resuming a suspended job, or when creating a new instance of that job. To do so, create and apply a <span class="inline-code">FlinkStateSnapshot</span> resource, referencing the job to snapshot:

apiVersion: flink.apache.org/v1beta1
kind: FlinkStateSnapshot
metadata:
  name: example-savepoint
spec:
  backoffLimit: 1
  jobReference:
    kind: FlinkDeployment
    name: custom-job-ha
  savepoint: {}

‍<div style="text-align: center">Listing 2: Resource definition savepoint.yaml for manually triggering a savepoint</div>

$ kubectl apply -f flink/savepoint.yaml

In order to start a job from this savepoint, retrieve its path on S3 via <span class="inline-code">kubectl describe FlinkStateSnapshot example-savepoint</span> and specify it under <span class="inline-code">spec.job.initialSavepointPath</span> in the <span class="inline-code">FlinkDeployment</span> resource of the job:

...
spec:
  job:
    jarURI: local:///opt/flink-jobs/flink-hello-world-1.0.jar
    upgradeMode: savepoint
    state: running
    initialSavepointPath: s3://flink-data/savepoints/savepoint-eba39a-1fd698a34a01
...

Note that if you’d like to restart an existing job from a specific savepoint, you also need to specify <span class="inline-code">savepointRedeployNonce</span> in addition. Set it to 1 when it hasn’t been set before, otherwise increment its value for each restart:

...
initialSavepointPath: s3://flink-data/savepoints/savepoint-eba39a-1fd698a34a01
savepointRedeployNonce: 1
...

Observability

Once you are moving your Flink jobs to production, it is critical to have good observability for all the components in place, allowing you to examine the system’s state and react to failures, degraded performance, etc. Typically, an application’s logs, metrics, and traces are imported into services and tools such as Datadog, New Relic, or OpenSearch for this purpose.

It’s beyond the scope of this post to provide a comprehensive description of an observability solution for Flink deployments. To give you a starting point though, the accompanying repository contains a basic example setup which shows how to ingest logs from job and task manager pods into Elasticsearch, allowing you to analyze them in a Kibana dashboard.

Install the Elasticsearch Kubernetes operator, then set up Elasticsearch and Kibana by running the following:

$ kubectl create -f https://download.elastic.co/downloads/eck/2.16.0/crds.yaml
$ kubectl apply -f https://download.elastic.co/downloads/eck/2.16.0/operator.yaml
$ kubectl -n logging apply -f logging/elastic.yaml
$ kubectl -n logging apply -f logging/kibana.yaml

The Kubernetes Logging operator is used for collecting and forwarding log files. You can deploy it with the following Helm command:

$ helm upgrade --install --wait --create-namespace --namespace logging logging-operator oci://ghcr.io/kube-logging/helm-charts/logging-operator

Next, configure a <span class="inline-code">Logging</span> instance:

$ kubectl -n logging apply -f logging/logging.yaml

This installs as an agent (<span class="inline-code">fluentbit</span>) on each worker node of the Kubernetes cluster, which collects the logs of the pods on this node—enriching them with Kubernetes metadata such as pod labels—and forwards them to a collector service (<span class="inline-code">fluentd</span>). There they are filtered and transformed as per the current configuration and finally forwarded to Elasticsearch. By using log4j2’s JsonTemplateLayout for Flink log messages (configured here), the same are emitted in a structured form, making it very easy to process them:

...
{"@timestamp":"2025-01-17T09:51:55.295Z","ecs.version":"1.2.0","log.level":"INFO","message":"Completed checkpoint 360 for job 2bd169b220ae0d7e26f3edf0869c244f (15960 bytes, checkpointDuration=9 ms, finalizationTime=0 ms).","process.thread.name":"jobmanager-io-thread-1","log.logger":"org.apache.flink.runtime.checkpoint.CheckpointCoordinator","flink-job-id":"2bd169b220ae0d7e26f3edf0869c244f"}
...

The ingestion pipeline into Elasticsearch is set up via two resources processed by the Kubernetes Logging operator. The first is a <span class="inline-code">ClusterOutput</span> resource, describing the destination of the pipeline, i.e. the Elasticsearch endpoint, the credentials, etc.:

apiVersion: logging.banzaicloud.io/v1beta1
kind: ClusterOutput
metadata:
  name: es-output
spec:
  elasticsearch:
    host: quickstart-es-http.logging.svc.cluster.local
    port: 9200
    scheme: https
    ssl_verify: false
    ssl_version: TLSv1_2
    user: elastic
    password:
      valueFrom:
        secretKeyRef:
          name: quickstart-es-elastic-user
          key: elastic
    buffer:
      timekey: 1m
      timekey_wait: 30s
      timekey_use_utc: true

‍<div style="text-align: center">Listing 3: Resource definition elastic-output.yaml, describing the output of the logging pipeline</div>

Create the output:

$ kubectl -n logging apply -f logging/elastic-output.yaml

The second resource is of type <span class="inline-code">Flow</span> and describes the set of pods to ingest from (leveraging the component labels set up by the Flink Kubernetes operator), as well as the parsing logic and destination of log pipeline:

apiVersion: logging.banzaicloud.io/v1beta1
kind: Flow
metadata:
  name: es-flow
spec:
  filters:
    - tag_normaliser: {}
    - parser:
        remove_key_name_field: true
        reserve_data: true
        parse:
          type: json
          time_key: "@timestamp"
  match:
    - select:
        labels:
          component: jobmanager
    - select:
        labels:
          component: taskmanager
  globalOutputRefs:
    - es-output

‍<div style="text-align: center">Listing 4: Resource definition elastic-flow.yaml, describing the logic of the logging pipeline</div>

Create the flow:

$ kubectl apply -f logging/elastic-flow.yaml

Once these resources are in place, the Kubernetes Logging operator will set up the required infrastructure for extracting and propagating the Flink logs, utilizing fluentbit and fluentd. The logs will be ingested into an Elasticsearch, which you can inspect and query using Kibana. To access Kibana, retrieve the password of the <span class="inline-code">elastic</span> user like so:

$ kubectl -n logging get secret quickstart-es-elastic-user -o=jsonpath='{.data.elastic}' | base64 --decode; echo
9V9520L2yvgzJla0RIyQo784

Then forward port 5601 of the “quickstart-kb-http” service:

$ kubectl -n logging port-forward service/quickstart-kb-http 5601

Navigate to https://localhost:5601/ (accept the warning because of the unknown certificate issue) and log in, then go to “Analytics” → “Discover” and create a data view for the index named <span class="inline-code">fluentd</span>:

Fig. 1: Flink job logs ingested into Elasticsearch

As mentioned before, this is just scratching the surface when it comes to setting up observability infrastructure for Flink. There’s a wide range of tools and platforms in that space, and you should choose what matches your requirements, integrating with the solutions and infrastructure already in use with your organization. Besides logging, your observability strategy will typically also cover metrics (which are supported by Flink via a range of metrics exporters, for instance for Prometheus, InfluxDB, and DataDog) and traces (e.g. supported via the OpenTelemetry trace exporter). But also Java-specific tools such as JDK Flight Recorder can be invaluable to gain insight into the performance characteristics and other runtime behaviors of your Flink jobs.

Bonus: Managing Flink Jobs With the Heimdall UI

For working with the Flink web UI in a production deployment, the operator optionally sets up a Kubernetes Ingress resource for each job, making its UI available for external access. To do so, simply add the following section to the job’s resource definition:

ingress:
  template: "localhost/{{name}}(/|$)(.*)"
  className: "nginx"
  annotations:
    nginx.ingress.kubernetes.io/rewrite-target: "/$2"

You’ll also need an ingress controller, which you can install for the local kind cluster like so:

$ helm upgrade --install ingress-nginx ingress-nginx \
  --repo https://kubernetes.github.io/ingress-nginx \
  --namespace ingress-nginx --create-namespace --set controller.nodeSelector."kubernetes\.io/hostname"=my-cluster-worker

$ kubectl apply -f heimdall/deploy-ingress-nginx.yaml

Once the controller is running, you can then access the Flink web UI from your local host, without requiring any port forwardings, for instance at http://localhost/custom-job-ha/ for the <span class="inline-code">custom-job-ha</span> job.

With a large number of jobs it can become challenging to keep track of all the deployed jobs and where to find their UIs, logs, or metrics. This is where Heimdall comes into the picture, a project by Yaroslav Tkachenko: it provides one unified UI for accessing all the Flink jobs executed via the operator on one Kubernetes cluster. You can install it into the cluster by running this:

$ kubectl apply -f heimdall/heimdall.yaml

Once running, the UI provides a list of all the deployed jobs, their status, resource, etc. Forward its port to access it from your host:

$ kubectl port-forward service/heimdall 8080
Fig. 2: Heimdall, a UI for managing multiple Flink jobs on one Kubernetes cluster

The endpoint URLs in the Heimdall UI are customizable, so that you can easily link to the right locations of Flink UI, API, metrics, and logs of your specific environment.

Summary and Discussion

The Apache Flink Kubernetes Operator is becoming increasingly popular for running Flink jobs on Kubernetes. Following the well-established Kubernetes operator pattern, it allows you to deploy and run your stream processing jobs by creating declarative resources which are processed in a reconciliation loop by the operator. This means you get to focus on the “What?” of your job, with a lot of the “How?” being taken care of automatically for you. Rather than having to deal with details such as configuring deployments, services, and ingresses, the operator lets you express your intent in a higher-level resource and derives all the required lower-level resources from that.

There’s quite a few features of the operator which we couldn’t discuss in this post, most prominently its capabilities for auto-scaling (i.e. scaling jobs automatically up and down in order to minimize back pressure while also satisfying given utilization targets) and auto-tuning Flink jobs (automatically adjusting the memory consumed by a job). Both are very promising, warranting their own article.

<div class="side-note">The Flink Kubernetes operator is a versatile solution for deploying and managing Flink jobs. When it comes to building a full-fledged Flink-based stream processing platform for an organization, there’s a non-trivial amount of additional aspects to keep in mind, including the following:<ul><li>The management and evolution of data schemas</li><li>Developer experience, for instance the ability to deploy SQL jobs and preview their results</li><li>Higher-level resources such as source and sink connectors or declarative end-to-end data pipelines</li><li>Cost attribution and quota management between the teams and departments of a larger organization</li><li>Security and Compliance with regulatory requirements such as HIPPA, GDPR, SOC 2</li><li>Operator upgrades as well as upgrades of (stateful) Flink jobs</li></ul>If all this sounds like it’s too much work, a fully-managed platform like Decodable might be interesting to you. Sign up for a free trial today and give it a try.</div>

Another interesting feature of the operator is the ability to customize its behavior via plug-ins. This can be useful if for instance you want to make sure all deployed jobs adhere to specific standards (by implementing a custom resource validator) or you want to amend their configuration upon deployment, e.g. to add specific pod labels automatically (by implementing a resource mutator).

The Flink Kubernetes operator is under active development and definitely a project worth keeping an eye on. For upcoming versions, the team has planned to work on a better “rollback mechanism and stability conditions” as well several improvements to the autoscaler.

📫 Email signup 👇

Did you enjoy this issue of Checkpoint Chronicle? Would you like the next edition delivered directly to your email to read from the comfort of your own home?

Simply enter your email address here and we'll send you the next issue as soon as it's published—and nothing else, we promise!

Gunnar Morling

Gunnar is an open-source enthusiast at heart, currently working on Apache Flink-based stream processing. In his prior role as a software engineer at Red Hat, he led the Debezium project, a distributed platform for change data capture. He is a Java Champion and has founded multiple open source projects such as JfrUnit, kcctl, and MapStruct.