Welcome back to this two-part blog post series about running Apache Flink on Kubernetes, using the Flink Kubernetes operator. In part one, we discussed installation and setup of the operator, different deployment types, how to deploy Flink jobs using custom Kubernetes resources, and how to create container images for your own Flink jobs. In this part, we’ll focus on aspects such as fault tolerance and high availability of your Flink jobs running on Kubernetes, savepoint management, observability, and more. You can find the complete source code for all the examples shown in this series in the Decodable examples repository on GitHub: on GitHub.
Fault Tolerance and High Availability
So far, we don’t have any means of fault tolerance or high availability (HA) in place for our job. If for instance a task or job manager pod crashes, all progress the job has made will be lost. Similarly, its state is not retained across restarts. If you change the job’s configuration or suspend and restart it, it will start from the beginning. To ensure that no state is lost in case of failures or (intentional) restarts, the following is required:
- Enable checkpointing for the Flink job, allowing to recover from task manager failures
- Enabling job manager HA, allowing to gracefully handle job manager failures
- Savepoints, allowing to take a consistent snapshot of a job and to resume from that snapshot later on
<div class="side-note">It also is possible to ensure HA for the Flink Kubernetes operator itself. To do so, enable leader election for the operator, as described in the documentation. You then can run multiple replicas of the operator, with one of them being the leader and others in a stand-by capacity, ready to take over should the current leader fail.<br/><br/>Whether this is necessary or not, depends on the requirements of the specific use case. Even without operator HA, existing job deployments are not affected by an operator failure and will continue to run. It thus may be acceptable to have short downtimes of the operator, provided the right monitoring is in place to detect the situation swiftly and resolve it manually.</div>
As Kubernetes pods are ephemeral, the corresponding state needs to be persisted externally. Oftentimes, object storage such as an S3 bucket is used for this purpose, thus avoiding the need to mount any persistent volumes to the Flink pods. Based on the upstream HA example resource, the following shows how to configure checkpoints, savepoints, and job manager HA using the bucket created in MinIO we set up before:
‍<div style="text-align: center">Listing 1: Resource definition custom-job-ha.yaml for a custom Flink job with fault tolerance and high availability</div>
Deploy the job by applying the resource:
Quite a few things are going on here, so let’s digest them one by one. First, the directories for storing checkpoints and savepoints are configured using the <span class="inline-code">state.checkpoints.dir</span> and <span class="inline-code">state.savepoints.dir</span> options, respectively. We are using RocksDB as a Flink state backend, enabling incremental checkpointing, thus avoiding the need to transfer the complete state upon each checkpoint, which can substantially reduce checkpoint durations. While not that relevant for a small-scale example like the one at hand, it can make sense to make use of S3 entropy injection: by adding a random part at the beginning of checkpoint paths, the data will be distributed across multiple shards of the S3 bucket.
To enable Flink’s job manager HA services, <span class="inline-code">high-availability.type</span> must be set to <span class="inline-code">kubernetes</span> and a directory for storing job manager metadata must be given via <span class="inline-code">high-availability.storageDir</span>. In order for Flink to access data on S3, an S3 file system plug-in is enabled via the <span class="inline-code">ENABLE_BUILT_IN_PLUGINS</span> environment variable set for the Flink containers in the pod template of the resource descriptor. Flink provides two plug-ins for S3; we are using the Presto one as this is the recommended option for storing checkpoint data.
For accessing the S3 API provided by MinIO in this example, we are providing the access key and secret key right within the resource definition itself. In a production use case, storing data in an S3 bucket on AWS, this should be avoided, instead authentication should happen via IAM, granting access to S3 via an IAM role for the Flink pods.
Lastly, by setting <span class="inline-code">job.upgradeMode</span> to <span class="inline-code">savepoint</span>, the operator will trigger a savepoint when suspending a job and start from that savepoint when resuming. To verify that this actually works, make a change to the resource descriptor (for instance to change its parallelism to 2) and apply it again. If you examine the logs of the task manager pod subsequently, you should notice that the emitted sequence numbers don’t start at 1 but at the point where the job left off before.
You can also suspend a job explicitly, if for instance your use case doesn’t require a real-time processing of data and you want to save on compute resources. To do so, patch the job’s target state to suspended:
Once the operator picks up this change, it will shut down the job, removing the job manager and task manager pods. The actual <span class="inline-code">FlinkDeployment</span> resource remains present, though, now indicating that it is suspended:
To resume the job, patch the target state back to <span class="inline-code">running</span>, after which you should be able to observe that the job continues from the latest savepoint. The cool thing is that savepoints created that way are represented by Kubernetes resources, too. The operator provides a dedicated resource type for this, <span class="inline-code">FlinkStateSnapshot</span>, which you can query to obtain the list of savepoints:
Manually Triggering Savepoints
It is also possible to manually create savepoints and use them when resuming a suspended job, or when creating a new instance of that job. To do so, create and apply a <span class="inline-code">FlinkStateSnapshot</span> resource, referencing the job to snapshot:
‍<div style="text-align: center">Listing 2: Resource definition savepoint.yaml for manually triggering a savepoint</div>
In order to start a job from this savepoint, retrieve its path on S3 via <span class="inline-code">kubectl describe FlinkStateSnapshot example-savepoint</span> and specify it under <span class="inline-code">spec.job.initialSavepointPath</span> in the <span class="inline-code">FlinkDeployment</span> resource of the job:
Note that if you’d like to restart an existing job from a specific savepoint, you also need to specify <span class="inline-code">savepointRedeployNonce</span> in addition. Set it to 1 when it hasn’t been set before, otherwise increment its value for each restart:
Observability
Once you are moving your Flink jobs to production, it is critical to have good observability for all the components in place, allowing you to examine the system’s state and react to failures, degraded performance, etc. Typically, an application’s logs, metrics, and traces are imported into services and tools such as Datadog, New Relic, or OpenSearch for this purpose.
It’s beyond the scope of this post to provide a comprehensive description of an observability solution for Flink deployments. To give you a starting point though, the accompanying repository contains a basic example setup which shows how to ingest logs from job and task manager pods into Elasticsearch, allowing you to analyze them in a Kibana dashboard.
Install the Elasticsearch Kubernetes operator, then set up Elasticsearch and Kibana by running the following:
The Kubernetes Logging operator is used for collecting and forwarding log files. You can deploy it with the following Helm command:
Next, configure a <span class="inline-code">Logging</span> instance:
This installs as an agent (<span class="inline-code">fluentbit</span>) on each worker node of the Kubernetes cluster, which collects the logs of the pods on this node—enriching them with Kubernetes metadata such as pod labels—and forwards them to a collector service (<span class="inline-code">fluentd</span>). There they are filtered and transformed as per the current configuration and finally forwarded to Elasticsearch. By using log4j2’s JsonTemplateLayout for Flink log messages (configured here), the same are emitted in a structured form, making it very easy to process them:
The ingestion pipeline into Elasticsearch is set up via two resources processed by the Kubernetes Logging operator. The first is a <span class="inline-code">ClusterOutput</span> resource, describing the destination of the pipeline, i.e. the Elasticsearch endpoint, the credentials, etc.:
‍<div style="text-align: center">Listing 3: Resource definition elastic-output.yaml, describing the output of the logging pipeline</div>
Create the output:
The second resource is of type <span class="inline-code">Flow</span> and describes the set of pods to ingest from (leveraging the component labels set up by the Flink Kubernetes operator), as well as the parsing logic and destination of log pipeline:
‍<div style="text-align: center">Listing 4: Resource definition elastic-flow.yaml, describing the logic of the logging pipeline</div>
Create the flow:
Once these resources are in place, the Kubernetes Logging operator will set up the required infrastructure for extracting and propagating the Flink logs, utilizing fluentbit and fluentd. The logs will be ingested into an Elasticsearch, which you can inspect and query using Kibana. To access Kibana, retrieve the password of the <span class="inline-code">elastic</span> user like so:
Then forward port 5601 of the “quickstart-kb-http” service:
Navigate to https://localhost:5601/ (accept the warning because of the unknown certificate issue) and log in, then go to “Analytics” → “Discover” and create a data view for the index named <span class="inline-code">fluentd</span>:
As mentioned before, this is just scratching the surface when it comes to setting up observability infrastructure for Flink. There’s a wide range of tools and platforms in that space, and you should choose what matches your requirements, integrating with the solutions and infrastructure already in use with your organization. Besides logging, your observability strategy will typically also cover metrics (which are supported by Flink via a range of metrics exporters, for instance for Prometheus, InfluxDB, and DataDog) and traces (e.g. supported via the OpenTelemetry trace exporter). But also Java-specific tools such as JDK Flight Recorder can be invaluable to gain insight into the performance characteristics and other runtime behaviors of your Flink jobs.
Bonus: Managing Flink Jobs With the Heimdall UI
For working with the Flink web UI in a production deployment, the operator optionally sets up a Kubernetes Ingress resource for each job, making its UI available for external access. To do so, simply add the following section to the job’s resource definition:
You’ll also need an ingress controller, which you can install for the local kind cluster like so:
Once the controller is running, you can then access the Flink web UI from your local host, without requiring any port forwardings, for instance at http://localhost/custom-job-ha/ for the <span class="inline-code">custom-job-ha</span> job.
With a large number of jobs it can become challenging to keep track of all the deployed jobs and where to find their UIs, logs, or metrics. This is where Heimdall comes into the picture, a project by Yaroslav Tkachenko: it provides one unified UI for accessing all the Flink jobs executed via the operator on one Kubernetes cluster. You can install it into the cluster by running this:
Once running, the UI provides a list of all the deployed jobs, their status, resource, etc. Forward its port to access it from your host:
The endpoint URLs in the Heimdall UI are customizable, so that you can easily link to the right locations of Flink UI, API, metrics, and logs of your specific environment.
Summary and Discussion
The Apache Flink Kubernetes Operator is becoming increasingly popular for running Flink jobs on Kubernetes. Following the well-established Kubernetes operator pattern, it allows you to deploy and run your stream processing jobs by creating declarative resources which are processed in a reconciliation loop by the operator. This means you get to focus on the “What?” of your job, with a lot of the “How?” being taken care of automatically for you. Rather than having to deal with details such as configuring deployments, services, and ingresses, the operator lets you express your intent in a higher-level resource and derives all the required lower-level resources from that.
There’s quite a few features of the operator which we couldn’t discuss in this post, most prominently its capabilities for auto-scaling (i.e. scaling jobs automatically up and down in order to minimize back pressure while also satisfying given utilization targets) and auto-tuning Flink jobs (automatically adjusting the memory consumed by a job). Both are very promising, warranting their own article.
<div class="side-note">The Flink Kubernetes operator is a versatile solution for deploying and managing Flink jobs. When it comes to building a full-fledged Flink-based stream processing platform for an organization, there’s a non-trivial amount of additional aspects to keep in mind, including the following:<ul><li>The management and evolution of data schemas</li><li>Developer experience, for instance the ability to deploy SQL jobs and preview their results</li><li>Higher-level resources such as source and sink connectors or declarative end-to-end data pipelines</li><li>Cost attribution and quota management between the teams and departments of a larger organization</li><li>Security and Compliance with regulatory requirements such as HIPPA, GDPR, SOC 2</li><li>Operator upgrades as well as upgrades of (stateful) Flink jobs</li></ul>If all this sounds like it’s too much work, a fully-managed platform like Decodable might be interesting to you. Sign up for a free trial today and give it a try.</div>
Another interesting feature of the operator is the ability to customize its behavior via plug-ins. This can be useful if for instance you want to make sure all deployed jobs adhere to specific standards (by implementing a custom resource validator) or you want to amend their configuration upon deployment, e.g. to add specific pod labels automatically (by implementing a resource mutator).
The Flink Kubernetes operator is under active development and definitely a project worth keeping an eye on. For upcoming versions, the team has planned to work on a better “rollback mechanism and stability conditions” as well several improvements to the autoscaler.