Since logical decoding was added to Postgres in version 9.4, this powerful feature for capturing changes from the write-ahead log of the database has been continuously improved. Postgres 15, released in October this year, added support for fine-grained control over which columns (by means of column lists) and rows (via row filters) should be exported from captured tables. This means, in relational terminology, projections and filters are now natively supported by Postgres change event publications.
Reasons for specifically configuring which columns and rows should be contained in a change data stream are manifold:
- Excluding large columns (say, a binary column with image data) can significantly reduce the size of change events and thus the required network bandwidth
- Excluding columns or rows with sensitive data can be necessary in order to satisfy privacy requirements, when for instance Personally Identifiable Information (PII) shouldn’t be exposed to external systems
- Filtering published rows by tenant id can be useful for setting up tenant-specific change streams in a multi-tenant architecture
Before the advent of Postgres-native column lists and row filters, users of Debezium – a popular open-source platform for change data capture (CDC), which also is used by several Decodable CDC connectors – would typically have implemented these kinds of use cases via a combination of configuration options and single message transformations (SMTs).
Projections are supported in Debezium via the column.include.list and column.exclude.list options. These configuration options are applied client-side, i.e. within the Debezium connector, which makes them less efficient to server-side column lists, potentially causing large amounts of data to be streamed to Debezium, only to be discarded there.
Filters are a bit more involved: while there is built-in support for filtering the contents of initial and ad-hoc incremental snapshots, filtering change events emitted from the WAL requires a custom SMT. Pushing this logic into the logical replication mechanism of the database itself makes a lot of sense from a usability and efficiency perspective.
So let’s see how Postgres 15 row filters can be used together with Debezium. Initially, I meant to demonstrate the usage of column lists, too. But in the course of exploring that feature, I discovered a bug in Postgres which causes incorrect events to be emitted for UPDATE and DELETE statements when column lists are present. So this will have to wait for another time. The Postgres community took care of this super fast: a bug fix has already been applied, so that column lists should work as expected in the next Postgres release.
Using Logical Decoding Row Filters With Debezium
To follow along, check out the postgres-publication-filtering demo project from GitHub. It contains a Docker Compose file for running Postgres as well as Apache Kafka and Kafka Connect with Debezium::
That Postgres example container image contains a table products with the following schema:
Let’s set up a change event stream for that table which only contains events if the quantity of the given product item is below 10. We could then for instance envision a microservice which subscribes to that stream and places backfill orders with our suppliers for those products.
Row filters are configured via Postgres publications, as used with the pgoutput logical decoding plug-in. As Debezium can only create publications with the default settings (at least for now), you need to manually create a custom publication with the required configurations and have Debezium make use of it. To do so, launch a Postgres session via pgcli:
Then create a publication like so:
As of Postgres 15, the CREATE PUBLICATION statement allows you to narrow down the events to be emitted for a given table via a custom WHERE clause. A few conditions apply to that clause (see this post for more information), most importantly:
- If the publication publishes UPDATE or DELETE events, only columns which are part of the table’s replica identity may be referenced
- Only simple expressions are allowed, for example not referring to user-defined functions or types, system columns etc.
That’s all we need to do on the Postgres side. Now let’s take a look at the required Debezium configuration:
As the Postgres publication only is used when Debezium retrieves change events via logical decoding from the WAL, you also need to customize the SELECT statement used for the products table when snapshotting the table. Otherwise, you’d get snapshot events for all the rows of that table, no matter what their quantity is. This can be done via the following configuration:
Altogether, the connector configuration looks like this:
Now register a connector instance with this configuration. If you have kcctl 🧸 installed (which I highly recommend), that’s as simple as that:
Alternatively, use curl to post the configuration directly to Kafka Connect’s REST API:
Observing Filtered Change Events
Return to your Postgres session and display the contents of the products table:
Out of those nine product items, only those with a quantity of less than ten show up as snapshot events in the corresponding Kafka topic:
Now let’s do some data changes and observe the resulting change events, as retrieved from the database via logical decoding. First, insert a few records into the table:
Nothing too exciting is happening in the Kafka topic: as you would expect, only events for the deck chair and the lamp products show up in Kafka, but not for the paint item, as its quantity is larger than 10. Things get a bit more interesting when doing some updates:
The following three events are emitted to Kafka for those:
Note not all of them have the u (update) operation type, but some are c (create) and d (delete) events. The logic here is that the publication works from a perspective of looking at the row set specified via the WHERE clause for the table. In that light,
- An update event is emitted for the deck chair quantity update from 7 to 6
- No event event is emitted for the paint quantity update from 15 to 14, as that row is not part of this row set before and after the change
- A create event is emitted for the paint quantity update from 14 to 9, as that row now became a part of the row set
- A delete event is emitted for the lamp quantity update from 3 to 11, as that row now is not a part of the row set any longer
Finally, let’s delete some product items:
In the Kafka topic you can observe that no change event is emitted for the first deletion (as there’s 11 lamps in stock). But there is an event for the deletion of the deck chair record with a quantity of six.
As they say, a picture is worth a thousand words (and I’d never pass on an opportunity for using my favorite tool Excalidraw), so here is an overview of the published events, depending on the specifics of a given data change:
Wrap-Up
Row filters (and column lists) are a great addition to the Postgres logical decoding toolbox. Having fine-grained control over which change events should be published and which field they should contain, opens up many interesting opportunities from a perspective of efficiency and data privacy as well as the ability to set up content specific change data streams, as demonstrated in the example above.
Going forward, a good next step usability-wise would be for Debezium to apply any configured row filters and column lists to the Postgres publications it creates, simplifying things for users a bit. As far as Flink SQL and Decodable are concerned, row filters and column lists potentially allow for the push down of filter and projection operators of streaming SQL queries; Instead of applying these operators within the Flink stream processing engine, SELECT and WHERE clauses of queries could be re-written transparently and these operators executed as part of the logical replication publication within Postgres itself. Flink supports this kind of push down of logic into data sources via the SupportsFilterPushDown and SupportsProjectionPushDown extension points. For example, this could be very interesting to customers who don't want specific segments of their data to leave the realm of their database. Please reach out to us if you think this would be an interesting capability to have.
If you would like to get started with your own experimentations around 15 Postgres row filters using Debezium, you can find the complete source code of the example shown above in this repository on GitHub. You can find more information about row filters in this blog post; also refer to this post to learn more about this and other new features related to logical replication in Postgres 15.
Many thanks to Robert Metzger for his feedback while writing this post!