Back
March 6, 2023
12
min read

The Wonders of Postgres Logical Decoding Messages for CDC

By
Gunnar Morling
Share this post

This article originally appeared at InfoQ.

Did you know there’s a function in Postgres that lets you write data which you can’t query? A function that lets you persist data in all kinds and shapes but which will never show up in any table? Let me tell you about <span class="inline-code">pg_logical_emit_message()</span>! It’s a Postgres function that allows you to write messages to the write-ahead log (WAL) of the database.

You can then use logical decoding—Postgres’ change data capture capability—to retrieve those messages from the WAL, process them, and relay them to external consumers.

In this article, we’ll explore how to take advantage of this feature for implementing three different use cases:

  • Propagating data between microservices via the outbox pattern
  • Application logging
  • Enriching audit logs with metadata

For retrieving logical decoding messages from Postgres we are going to use Debezium, a popular open-source platform for log-based change data capture (CDC), which can stream data changes from a large variety of databases into data streaming platforms like Apache Kafka or AWS Kinesis.

We’ll also use Apache Flink and the Flink CDC project, which seamlessly integrates Debezium into the Flink ecosystem, for enriching and routing raw change event streams. You can learn more about the foundations of change data capture and Debezium in this talk from QCon San Francisco.

Logical Decoding Messages 101

Before diving into specific use cases, let’s take a look at how logical decoding messages can be emitted and consumed. To follow along, make sure to have Docker installed on your machine. Start by checking out this example project from GitHub:

git clone https://github.com/decodableco/examples.git
cd examples/postgres-logical-decoding

The project contains a Docker Compose file for running a Postgres database, which is enabled for logical replication already. Start it like so:

docker compose up

Then, in another terminal window, connect to that Postgres instance using the pgcli command line client:

docker run --tty --rm -i \
  --network logical-decoding-network \
  quay.io/debezium/tooling:1.2 bash -c \
  'pgcli postgresql://postgresuser:postgrespw@postgres:5432/demodb'

Next, you need to create a replication slot. A replication slot represents one specific stream of changes coming from a Postgres database and keeps track of how far a consumer has processed this stream. For this purpose, it stores the latest log sequence number (LSN) that the slot’s consumer has processed and acknowledged.

Each slot has a name and an assigned decoding plug-in which defines the format of that stream. Create a slot using the “test_decoding” plug-in, which emits changes in a simple text-based protocol, like this:

postgresuser@postgres:demodb> SELECT * FROM pg_create_logical_replication_slot('demo_slot', 'test_decoding');

+-------------+-----------+
| slot_name   | lsn       |
|-------------+-----------|
| demo_slot   | 0/1A24E38 |
+-------------+-----------+

For production scenarios it is recommended to use the pgoutput plug-in, which emits change events using an efficient Postgres-specific binary format and is available by default in Postgres since version 10. Other commonly used options include the Decoderbufs plug-in (based on the Google Protocol Buffers format) and wal2json (emitting change events as JSON).

Changes are typically retrieved from remote clients such as Debezium by establishing a replication stream with the database. Alternatively, you can use the function <span class="inline-code">pg_logical_slot_get_changes()</span>, which lets you fetch changes from a given replication slot via SQL, optionally reading only up to a specific LSN (the first NULL parameter) or only a specific number of changes (the second NULL parameter). This comes in handy for testing purposes:

postgresuser@postgres:demodb> SELECT * FROM pg_logical_slot_get_changes('demo_slot', NULL, NULL);

+-------+-------+--------+
| lsn   | xid   | data   |
|-------+-------+--------|
+-------+-------+--------+

No changes should be returned at this point. Let’s insert a logical decoding message using the <span class="inline-code">pg_logical_emit_message()</span> function:

postgresuser@postgres:demodb> SELECT * FROM pg_logical_emit_message(true, 'context', 'Hello World!');

+---------------------------+
| pg_logical_emit_message   |
|---------------------------|
| 0/1A24F68                 |
+---------------------------+

The function has three parameters:

  • <span class="inline-code">transactional</span>: a boolean flag indicating whether the message should be transactional or not; when issued while a transaction is pending and that transaction gets rolled back eventually, a transactional message would not be emitted, whereas a non-transactional message would be written to the WAL nevertheless
  • <span class="inline-code">prefix</span>: a textual identifier for categorizing messages; for instance, this could indicate the type of a specific message
  • <span class="inline-code">content</span>: the actual payload of the message, either as text or binary data; you have full flexibility of what to emit here, e.g., in regard to format, schema, and semantics

When you retrieve changes from the slot again after having emitted a message, you now should see three change events: a <span class="inline-code">BEGIN</span> and a <span class="inline-code">COMMIT</span> event for the implicitly created transaction when emitting the event, and the “Hello World!” message itself. Note that this message doesn’t appear in any Postgres table or view as would be the case when adding data using the <span class="inline-code">INSERT</span> statement; this message is solely present in the database's transaction log.

There are a few other useful functions dealing with logical decoding messages and replication slots, including the following:

  • <span class="inline-code">pg_logical_slot_get_binary_changes()</span>: retrieves binary messages from a slot
  • <span class="inline-code">pg_logical_slot_peek_changes()</span>: allows to take a look at changes from a slot without advancing it
  • <span class="inline-code">pg_replication_slot_advance()</span>: advances a replication slot
  • <span class="inline-code">pg_drop_replication_slot()</span>: deletes a replication slot

You also can query the <span class="inline-code">pg_replication_slots</span> view for examining the current status of your replication slots, latest confirmed LSN, and more.

Use Cases

Having discussed the foundations of logical decoding messages, let’s now explore a few use cases of this useful Postgres API.

The Outbox Pattern

For microservices, it’s a common requirement that, when processing a request, a service needs to update its own database and simultaneously send a message to other services. As an example, consider a “fulfillment” service in an e-commerce scenario: when the status of a shipment changes from <span class="inline-code">READY_TO_SHIP</span> to <span class="inline-code">SHIPPED</span>, the shipment’s record in the fulfillment service database needs to be updated accordingly, but also a message should be sent to the “customer” service so that it can update the customer’s account history and trigger an email notification for the customer.

Now, when using data streaming platforms like Apache Kafka for connecting your services, you can’t reliably implement this scenario by just letting the fulfillment service issue its local database transaction and then send a message via Kafka. The reason is that it is not supported to have shared transactions for a database and Kafka (in technical terms, Kafka can’t participate in distributed transaction protocols like XA). While everything looks fine on the surface, you can end up with an inconsistent state in case of failures. The database transaction could get committed, but sending out the notification via Kafka fails. Or, the other way around: the customer service gets notified, but the local database transaction gets rolled back.

While you can find this kind of implementation in many applications, always remember: “Friends don’t let friends do dual writes”! A solution to this problem is the outbox pattern: instead of trying to update two resources at once (a database and Kafka), you only update a single one—the service’s database. When updating the shipment state in the database, you also write the message to be sent to an outbox table; this happens as part of one shared transaction, i.e., applying the atomicity guarantees you get from ACID transactions. Either the shipment state update and the outbox message get persisted, or none of them do. You then use change data capture to retrieve any inserts from the outbox in the database and propagate them to consumers.

More information about the outbox pattern can be found in this blog post on the Debezium blog. Another resource is this article on InfoQ which discusses how the outbox pattern can be used as the foundation for implementing Sagas between multiple services. In the following, I’d like to dive into one particular implementation approach for the pattern. Instead of inserting outbox events in a dedicated outbox table, the idea is to emit them just as logical decoding messages to the WAL.

There are pros and cons to either approach. What makes the route via logical decoding messages compelling is that it avoids any housekeeping needs. Unlike with an outbox table, there’s no need to remove messages after they have been consumed from the transaction log. Also, this emphasizes the nature of an outbox being an append-only medium: messages must never be modified after being added to the outbox, which might happen by accident with a table-based approach.

Regarding the content of outbox messages, you have full flexibility there in general. Sticking to the e-commerce domain from above, it could, for instance, describe a shipment serialized as JSON, Apache Avro, Google Protocol Buffers, or any other format you choose. What’s important to keep in mind is that while the message content doesn’t adhere to any specific table schema from a database perspective, it’s subject to an (ideally explicit) contract between the sending application and any message consumers. In particular, the schema of any emitted events should only be modified if you keep in mind the impact on consumers and backward compatibility.

One commonly used approach is to look at the design of outbox events and their schemas from a domain-driven design perspective. Specifically, Debezium recommends that your messages have the following attributes:

  • id: a unique message id, e.g., a UUID, which consumers can use for deduplication purposes
  • aggregate type: describes the kind of aggregate an event is about, e.g., “customer,” “shipment,” or “purchase order”; when propagating outbox events via Kafka or other streaming platforms, this can be used for sending events of one aggregate type to a specific topic
  • aggregate id: the id of the aggregate an event is about, e.g., a customer or order id; this can be used as the record key in Kafka, thus ensuring all events pertaining to one aggregate will go to the same topic partition and making sure consumers receive these events in the correct order
  • payload: the actual message payload; unlike “raw” table-level CDC events, this can be a rich structure, representing an entire aggregate and all its parts, which in the database itself may spread across multiple tables
Figure 1: Routing outbox events from the transaction log to different Kafka topics.

Enough of the theory—let’s see how a database transaction could look, which emits a logical decoding message with an outbox event. In the accompanying GitHub repository, you can find a Docker Compose file for spinning up all the required components and detailed instructions for running the complete example yourself. Emit an outbox message like this:

postgresuser@postgres:demodb> SELECT * FROM pg_logical_emit_message(
  true,
  'outbox',
  '{
    "id" : "298c2cc3-71bb-4d2b-b5b4-1b14006d56e6",
    "aggregate_type" : "shipment",
    "aggregate_id" : 42,
    "payload" : {
      "customer_id" : 7398,
      "item_id" : 8123,
      "status" : "SHIPPED",
      "numberOfPackages" : 3,
      "address" : "Bob Summers, 12 Main St., 90210, Los Angeles/CA, US"
    }
  }'
 );
 

This creates a transactional message (i.e., it would not be emitted if the transaction aborts, e.g., because of a constraint violation of another record inserted in the same transaction). It uses the “outbox” prefix (allowing it to distinguish it from messages of other types) and contains a JSON message as the actual payload.

Regarding retrieving change events and propagating them to Kafka, the details depend on how exactly Debezium, as the underlying CDC tool, is deployed. When used with Kafka Connect, Debezium provides a single message transform (SMT) that supports outbox tables and, for instance, routes outbox events to different topics in Kafka based on a configurable column containing the aggregate type. However, this SMT doesn’t yet support using logical decoding messages as the outbox format.

When using Debezium via Flink CDC, you could implement a similar logic using a custom <span class="inline-code">KafkaRecordSerializationSchema</span> which routes outbox events to the right Kafka topic and propagates the aggregate id to the Kafka message key, thus ensuring correct ordering semantics. A basic implementation of this could look like this (you can find the complete source code, including the usage of this serializer in a Flink job here):

public class OutboxSerializer implements KafkaRecordSerializationSchema {

  private static final long serialVersionUID = 1L;

  private ObjectMapper mapper;

  @Override
  public ProducerRecord serialize(ChangeEvent element, 
      KafkaSinkContext context, Long timestamp) {
    try {
      JsonNode content = element.getMessage().getContent();


      ProducerRecord record =
          new ProducerRecord(
        content.get("aggregate_type").asText(),
        content.get("aggregate_id").asText().getBytes(Charsets.UTF_8),
        mapper.writeValueAsBytes(content.get("payload"))
      );

      record.headers().add("message_id",      
          content.get("id").asText().getBytes(Charsets.UTF_8));

      return record;
    }
    catch (JsonProcessingException e) {
      throw new IllegalArgumentException(
          "Couldn't serialize outbox message", e);
    }
  }

  @Override
  public void open(InitializationContext context,
    KafkaSinkContext sinkContext) throws Exception {


    mapper = new ObjectMapper();
    SimpleModule module = new SimpleModule();
    module.addDeserializer(Message.class, new MessageDeserializer());
        mapper.registerModule(module);
  }
}

With that Flink job in place, you’ll be able to examine the outbox message on the “shipment” Kafka topic like so:

docker run --tty --rm \
  --network logical-decoding-network \
  quay.io/debezium/tooling:1.2 \
  kcat -b kafka:9092 -C -o beginning -q -t shipment \
  -f '%k -- %h -- %s\n'

42 -- message_id=298c2cc3-71bb-4d2b-b5b4-1b14006d56e6 -- {"customer_id":7398,"item_id":8123,"status":"SHIPPED","numberOfPackages":3,"address":"Bob Summers, 12 Main St., 90210, Los Angeles/CA, US"}

The topic name corresponds to the specified aggregate type, i.e., if you were to issue outbox events for other aggregate types, they’d be routed to different topics accordingly. The message key is 42, matching the aggregate id. The unique event id is propagated as a Kafka message header, enabling consumers to implement efficient deduplication by keeping track of the ids they’ve already received and processed and ignoring any potential duplicates they may encounter. Lastly, the payload of the outbox event is propagated as the Kafka message value.

In particular, in larger organizations with a diverse set of event producers and consumers, it makes sense to align on a shared event envelope format, which standardizes common attributes like event timestamp, origin, partitioning key, schema URLs, and others. The CloudEvents specification comes in handy here, especially for defining event types and their schemas. It is an option worth considering to have your applications emit outbox events adhering to the CloudEvents standard.

Logging

While log management of modern applications typically happens through dedicated platforms like Datadog or Splunk, which ingest changes from dedicated APIs or logs in the file system, it sometimes can be convenient to persist log messages in the database of an application. Log libraries such as the widely used log4j 2 provide database-backed appenders for this purpose. These will typically require a second connection for the logger, though, because in case of a rollback of an application transaction itself, you still (and in particular then) want to write out any log messages, helping you with failure analysis.

Non-transactional logical decoding messages can be a nice means of using a single connection and still ensuring that log messages persist, also when a transaction is rolled back. For example, let’s consider the following situation with two transactions, one of which is committed and one rolled back:

Figure 2: Using non-transactional logical decoding messages for logging purposes.

To follow along, run the following sequence of statements in the pgcli shell:

–- Assuming this table: CREATE TABLE data (id INTEGER, value TEXT);

BEGIN;
INSERT INTO data(id, value) VALUES('1', 'foo');
SELECT * FROM pg_logical_emit_message(false, 'log', 'OK');
INSERT INTO data(id, value) VALUES('2', 'bar');
COMMIT;

BEGIN;
INSERT INTO data(id, value) VALUES('3', 'baz');
SELECT * FROM pg_logical_emit_message(false, 'log', 'ERROR');
INSERT INTO data(id, value) VALUES('4', 'qux');
ROLLBACK;

The first transaction inserts two records in a new table, “data” and also emits a logical decoding message. The second transaction applies similar changes but then is rolled back. When retrieving the change events from the replication slot (using the “testing” decoding plug-in as shown above), the following events will be returned:

postgresuser@postgres:demodb> SELECT * FROM pg_logical_slot_peek_changes('demo_slot', NULL, NULL) order by lsn;

+-----------+-------+------------------------------------------------------------+
| lsn       | xid   | data                                                       |
|-----------+-------+------------------------------------------------------------|
| 0/1A483F8 | 768   | BEGIN 768                                                  |
| 0/1A504B8 | 768   | table public.data: INSERT: id[integer]:1 value[text]:'foo' |
| 0/1A50530 | 768   | message: transactional: 0 prefix: log, sz: 2 content:OK    |
| 0/1A50530 | 768   | table public.data: INSERT: id[integer]:2 value[text]:'bar' |
| 0/1A509B8 | 768   | COMMIT 768                                                 |
| 0/1A50A38 | 769   | message: transactional: 0 prefix: log, sz: 5 content:ERROR |
+-----------+-------+------------------------------------------------------------+

As expected, there are two <span class="inline-code">INSERT</span> events and the log message for the first transaction. However, there are no change events for the aborted transaction for the <span class="inline-code">INSERT</span> statements, as it was rolled back. But as the logical decoding message was non-transactional, it still was written to the WAL and can be retrieved. I.e., you actually can have that cake and eat it too!

Audit Logs

In enterprise applications, keeping an audit log of your data is a common requirement, i.e., a complete trail of all the changes done to a database record, such as a purchase order or a customer.

There are multiple possible approaches for building such an audit log; one of them is to copy earlier record versions into a separate history table whenever a data change is made. Arguably, this increases application complexity. Depending on the specific implementation strategy, you might have to deploy triggers for all the tables that should be audited or add libraries such as Hibernate Envers, an extension to the popular Hibernate object-relational mapping tool. In addition, there’s a performance impact, as the audit records are inserted as part of the application’s transactions, thus increasing write latency.

Change data capture is an interesting alternative for building audit logs: extracting data changes from the database transaction log requires no changes to writing applications. A change event stream, with events for all the inserts, updates, and deletes executed for a table—e.g., persisted as a topic in Apache Kafka, whose records are immutable by definition—could be considered a simple form of an audit log. As the CDC process runs asynchronously, there’s no latency impact on writing transactions.

One shortcoming of this approach—at least in its most basic form—is that it doesn’t capture contextual metadata, like the application user making a given change, client information like device configuration or IP address, use case identifiers, etc. Typically, this data is not stored in the business tables of an application and thus isn’t exposed in raw change data events. 

The combination of logical decoding messages and stream processing, with Apache Flink, can provide a solution here. At the beginning of each transaction, the source application writes all the required metadata into a message; in comparison to writing a full history entry for each modified record, this just adds a small overhead on the write path. You can then use a simple Flink job for enriching all the subsequent change events from that same transaction with that metadata. As all change events emitted by Debezium contain the id of the transaction they originate from, including logical decoding messages, correlating the events of one transaction isn’t complicated. The following image shows the general idea:

Figure 3: Enriching data change events with transaction-scoped audit metadata

When it comes to implementing this logic with Apache Flink, you can do this using a rather simple mapping function, specifically by implementing the <span class="inline-code">RichFlatMapFunction</span> interface, which allows you to combine the enrichment functionality and the removal of the original logical decoding messages in a single operator call:

public void flatMap(String value, Collector out)
    throws Exception {

  ChangeEvent changeEvent = mapper.readValue(value, ChangeEvent.class);
  String op = changeEvent.getOp();
  String txId = changeEvent.getSource().get("txId").asText();

  // logical decoding message
  if (op.equals("m")) {
    Message message = changeEvent.getMessage();


    // an audit metadata message -> remember it
    if (message.getPrefix().equals("audit")) {
      localAuditState = new AuditState(txId, message.getContent());
      return;
    }
    else {
      out.collect(value);
    }
  }
  // a data change event -> enrich it with the metadata
  else {
    if (txId != null && localAuditState != null) {
      if (txId.equals(localAuditState.getTxId())) {
        changeEvent.setAuditData(localAuditState.getState());
      }
    else {
      localAuditState = null;
    }
  }

  changeEvent.setTransaction(null);
  out.collect(mapper.writeValueAsString(changeEvent));
}

The logic is as follows:

  • When the incoming event is of type “m” (i.e., a logical decoding message) and it is an audit metadata event, put the content of the event into a Flink value state
  • When the incoming event is of any other type, and we have stored audit state for the event’s transaction before, enrich the event with that state
  • When the transaction id of the incoming event doesn’t match what’s stored in the audit state (e.g., when a transaction was issued without a metadata event at the beginning), clear the state store and propagate the event as is

You can find a simple yet complete Flink job that runs that mapping function against the Flink CDC connector for Postgres in the aforementioned GitHub repository. See the instructions in the README for running that job, triggering some data changes, and observing the enriched change events. As an example, let’s consider the following transaction which first emits a logical decoding message with the transaction metadata (user name and client IP address) and then two <span class="inline-code">INSERT</span> statements:

BEGIN;
SELECT * FROM pg_logical_emit_message(true, 'audit', '{ "user" : "bob@example.com", "client" : "10.0.0.1" }');
INSERT INTO inventory.customer(first_name, last_name, email) VALUES ('Bob', 'Green', 'bob@example.com');
INSERT INTO inventory.address
  (customer_id, type, line_1, line_2, zip_code, city, country)
VALUES
  (currval('inventory.customer_id_seq'), 'Home', '12 Main St.', 'sdf', '90210', 'Los Angeles', 'US');
COMMIT;

The enriched change events, as emitted by Apache Flink, would look like so:

{
  "op" : "c",
  "ts_ms" : 1673434483049,
  "source" : {
    "connector" : "postgresql",
    "snapshot" : false,
    "db" : "demodb",
    "table" : "customer"
    "lsn" : 24023128,
    "txId" : 555,
    ...
  },
  "before" : null,
  "after" : {
    "id" : 1018,
    "first_name" : "Bob",
    "last_name" : "Green",
    "email" : "bobasdf@example.com"
  },
  "auditData" : {
    "user" : "bob@example.com",
    "client" : "10.0.0.1"
  }
}
{
  "op" : "c",
  "ts_ms" : 1673434483050,
  "source" : {
    "connector" : "postgresql",
    "snapshot" : false,
    "db" : "demodb",
    "table" : "address"
    "lsn" : 24023129,
    "txId" : 555,
    ...
  },
  "before" : null,
  "after" : {
    "id" : 10007,
    "customer_id" : 1018,
    "type" : "Home",
    "line_1" : "12 Main St.",
    "line_2" : "sdf",
    "zip_code" : "90210",
    "city" : "Los Angeles",
    "country" : "US"
  },
  "auditData" : {
    "user" : "bob@example.com",
    "client" : "10.0.0.1"
  }
}

Within the same Flink job, you now could add a sink connector and for instance write the enriched events into a Kafka topic. Alternatively, depending on your business requirements, it can be a good idea to propagate the change events into a queryable store, for instance, an OLAP store like Apache Pinot or Clickhouse. You could use the same approach for enriching change events with contextual metadata for other purposes too, generally speaking for capturing all kinds of “intent” which isn’t directly persisted in the business tables of your application.

Bonus: Advancing Replication Slots

Finally, let’s discuss a technical use case for logical decoding messages: advancing Postgres replication slots. This can come in handy in certain scenarios, where otherwise large segments of the WAL could be retained by the database, eventually causing the database machine to run out of disk space.

This is because replication slots are always created in the context of a specific database, whereas the WAL is shared between all the databases on the same Postgres host. This means a replication slot set up for a database without any data changes and which, therefore, can’t advance, will retain potentially large chunks of WAL if changes are made to another database on the same host.

To experience this situation, stop the currently running Docker Compose set-up and launch this alternative Compose file from the example project:

docker compose -f docker-compose-multi-db.yml up

This spins up a Postgres database container with two databases, DB1 and DB2. Then launch the AdvanceSlotMain class. You can do so via Maven (note this is just for demonstration and development purposes; usually, you’d package up your Flink job as a JAR and deploy it to a running Flink cluster):

mvn exec:exec@advanceslot

It runs a simple Flink pipeline that retrieves all changes from the DB2 database and prints them out on the console. Now, do some changes on the DB1 database:

docker run --tty --rm -i \
  --network logical-decoding-network \
  quay.io/debezium/tooling:1.2 \
  bash -c 'pgcli postgresql://postgresuser:postgrespw@order-db:5432/db1'

postgresuser@order-db:db1> CREATE TABLE data (id INTEGER, value TEXT);
postgresuser@order-db:db1> INSERT INTO data SELECT generate_series(1,1000) AS id, md5(random()::text) AS value;

Query the status of the replication slot (“flink”, set up for database “DB2”), and as you keep running more inserts in DB1, you’ll see that the retained WAL of that slot continuously grows, as long as there are no changes done over in DB2:

postgresuser@order-db:db1> SELECT
  slot_name,
  database,
  pg_size_pretty(
    pg_wal_lsn_diff(
      pg_current_wal_lsn(), restart_lsn)) AS retained_wal,
  active,
  restart_lsn, confirmed_flush_lsn FROM pg_replication_slots;
+-----------------+------------+----------------+----------+---------------+-----------------------+
| slot_name       | database   | retained_wal   | active   | restart_lsn   | confirmed_flush_lsn   |
|-----------------+------------+----------------+----------+---------------+-----------------------|
| flink           | db2        | 526 kB         | True     | 0/22BA030     | 0/22BA030             |
+-----------------+------------+----------------+----------+---------------+-----------------------+

The problem is that as long as there are no changes in the DB2 database, the CDC connector of the running Flink job will never be invoked and thus never have a chance to acknowledge the latest processed LSN of its replication slot. Now, let’s use pg_logical_emit_message() to fix this situation. Get another Postgres shell, this time for DB2, and emit a message like so:

docker run --tty --rm -i \
  --network logical-decoding-network \
  quay.io/debezium/tooling:1.2 \
  bash -c 'pgcli postgresql://postgresuser:postgrespw@order-db:5432/db2'

postgresuser@order-db:db2> SELECT pg_logical_emit_message(false, 'heartbeat', now()::varchar);

In the console output of AdvanceSlotMain you should see the change event emitted by the Debezium connector for that message. With the next checkpoint issued by Flink (look for “Completed checkpoint XYZ for job …” messages in the log), the LSN of that event will also be flushed to the database, essentially allowing the database to discard any WAL segments before that. If you now examine the replication slot again, you should find that the “retained WAL” value is much lower than before (as this process is asynchronous, it may take a bit until the disk space is freed up).

Wrapping Up

Logical decoding messages are not widely known yet very powerful tools, which should be in the box for every software engineer working with Postgres. As you’ve seen, the ability to emit messages into the write-ahead log without them ever surfacing in any actual table allows for a number of interesting use cases, such as reliable data exchange between microservices (thus avoiding unsafe dual writes), application logging, or providing metadata for building audit logs. Employing stateful stream processing using Apache Flink, you can enrich and route your captured messages as well as apply other operations on your data change events, such as filtering, joining, windowed aggregations, and more.

Where there is great power, there are also great responsibilities. As logical decoding messages don’t have an explicit schema, unlike your database tables, the application developer must define sensible contracts and carefully evolve them, always keeping backward compatibility in mind. The CloudEvents format can be a useful foundation for your custom message schemas, providing all the producers and consumers in an organization with a consistent message structure and well-defined semantics.

If you’d like to get started with your explorations around logical decoding messages, look at the GitHub repo accompanying this article, which contains the source code of all the examples shown above and detailed instructions for running them.

Many thanks to Hans-Peter Grahsl, Robert Metzger, and Srini Penchikala for their feedback while writing this article.

Additional Resources

📫 Email signup 👇

Did you enjoy this issue of Checkpoint Chronicle? Would you like the next edition delivered directly to your email to read from the comfort of your own home?

Simply enter your email address here and we'll send you the next issue as soon as it's published—and nothing else, we promise!

👍 Got it!
Oops! Something went wrong while submitting the form.
Gunnar Morling

Gunnar is an open-source enthusiast at heart, currently working on Apache Flink-based stream processing. In his prior role as a software engineer at Red Hat, he led the Debezium project, a distributed platform for change data capture. He is a Java Champion and has founded multiple open source projects such as JfrUnit, kcctl, and MapStruct.

This article originally appeared at InfoQ.

Did you know there’s a function in Postgres that lets you write data which you can’t query? A function that lets you persist data in all kinds and shapes but which will never show up in any table? Let me tell you about <span class="inline-code">pg_logical_emit_message()</span>! It’s a Postgres function that allows you to write messages to the write-ahead log (WAL) of the database.

You can then use logical decoding—Postgres’ change data capture capability—to retrieve those messages from the WAL, process them, and relay them to external consumers.

In this article, we’ll explore how to take advantage of this feature for implementing three different use cases:

  • Propagating data between microservices via the outbox pattern
  • Application logging
  • Enriching audit logs with metadata

For retrieving logical decoding messages from Postgres we are going to use Debezium, a popular open-source platform for log-based change data capture (CDC), which can stream data changes from a large variety of databases into data streaming platforms like Apache Kafka or AWS Kinesis.

We’ll also use Apache Flink and the Flink CDC project, which seamlessly integrates Debezium into the Flink ecosystem, for enriching and routing raw change event streams. You can learn more about the foundations of change data capture and Debezium in this talk from QCon San Francisco.

Logical Decoding Messages 101

Before diving into specific use cases, let’s take a look at how logical decoding messages can be emitted and consumed. To follow along, make sure to have Docker installed on your machine. Start by checking out this example project from GitHub:

git clone https://github.com/decodableco/examples.git
cd examples/postgres-logical-decoding

The project contains a Docker Compose file for running a Postgres database, which is enabled for logical replication already. Start it like so:

docker compose up

Then, in another terminal window, connect to that Postgres instance using the pgcli command line client:

docker run --tty --rm -i \
  --network logical-decoding-network \
  quay.io/debezium/tooling:1.2 bash -c \
  'pgcli postgresql://postgresuser:postgrespw@postgres:5432/demodb'

Next, you need to create a replication slot. A replication slot represents one specific stream of changes coming from a Postgres database and keeps track of how far a consumer has processed this stream. For this purpose, it stores the latest log sequence number (LSN) that the slot’s consumer has processed and acknowledged.

Each slot has a name and an assigned decoding plug-in which defines the format of that stream. Create a slot using the “test_decoding” plug-in, which emits changes in a simple text-based protocol, like this:

postgresuser@postgres:demodb> SELECT * FROM pg_create_logical_replication_slot('demo_slot', 'test_decoding');

+-------------+-----------+
| slot_name   | lsn       |
|-------------+-----------|
| demo_slot   | 0/1A24E38 |
+-------------+-----------+

For production scenarios it is recommended to use the pgoutput plug-in, which emits change events using an efficient Postgres-specific binary format and is available by default in Postgres since version 10. Other commonly used options include the Decoderbufs plug-in (based on the Google Protocol Buffers format) and wal2json (emitting change events as JSON).

Changes are typically retrieved from remote clients such as Debezium by establishing a replication stream with the database. Alternatively, you can use the function <span class="inline-code">pg_logical_slot_get_changes()</span>, which lets you fetch changes from a given replication slot via SQL, optionally reading only up to a specific LSN (the first NULL parameter) or only a specific number of changes (the second NULL parameter). This comes in handy for testing purposes:

postgresuser@postgres:demodb> SELECT * FROM pg_logical_slot_get_changes('demo_slot', NULL, NULL);

+-------+-------+--------+
| lsn   | xid   | data   |
|-------+-------+--------|
+-------+-------+--------+

No changes should be returned at this point. Let’s insert a logical decoding message using the <span class="inline-code">pg_logical_emit_message()</span> function:

postgresuser@postgres:demodb> SELECT * FROM pg_logical_emit_message(true, 'context', 'Hello World!');

+---------------------------+
| pg_logical_emit_message   |
|---------------------------|
| 0/1A24F68                 |
+---------------------------+

The function has three parameters:

  • <span class="inline-code">transactional</span>: a boolean flag indicating whether the message should be transactional or not; when issued while a transaction is pending and that transaction gets rolled back eventually, a transactional message would not be emitted, whereas a non-transactional message would be written to the WAL nevertheless
  • <span class="inline-code">prefix</span>: a textual identifier for categorizing messages; for instance, this could indicate the type of a specific message
  • <span class="inline-code">content</span>: the actual payload of the message, either as text or binary data; you have full flexibility of what to emit here, e.g., in regard to format, schema, and semantics

When you retrieve changes from the slot again after having emitted a message, you now should see three change events: a <span class="inline-code">BEGIN</span> and a <span class="inline-code">COMMIT</span> event for the implicitly created transaction when emitting the event, and the “Hello World!” message itself. Note that this message doesn’t appear in any Postgres table or view as would be the case when adding data using the <span class="inline-code">INSERT</span> statement; this message is solely present in the database's transaction log.

There are a few other useful functions dealing with logical decoding messages and replication slots, including the following:

  • <span class="inline-code">pg_logical_slot_get_binary_changes()</span>: retrieves binary messages from a slot
  • <span class="inline-code">pg_logical_slot_peek_changes()</span>: allows to take a look at changes from a slot without advancing it
  • <span class="inline-code">pg_replication_slot_advance()</span>: advances a replication slot
  • <span class="inline-code">pg_drop_replication_slot()</span>: deletes a replication slot

You also can query the <span class="inline-code">pg_replication_slots</span> view for examining the current status of your replication slots, latest confirmed LSN, and more.

Use Cases

Having discussed the foundations of logical decoding messages, let’s now explore a few use cases of this useful Postgres API.

The Outbox Pattern

For microservices, it’s a common requirement that, when processing a request, a service needs to update its own database and simultaneously send a message to other services. As an example, consider a “fulfillment” service in an e-commerce scenario: when the status of a shipment changes from <span class="inline-code">READY_TO_SHIP</span> to <span class="inline-code">SHIPPED</span>, the shipment’s record in the fulfillment service database needs to be updated accordingly, but also a message should be sent to the “customer” service so that it can update the customer’s account history and trigger an email notification for the customer.

Now, when using data streaming platforms like Apache Kafka for connecting your services, you can’t reliably implement this scenario by just letting the fulfillment service issue its local database transaction and then send a message via Kafka. The reason is that it is not supported to have shared transactions for a database and Kafka (in technical terms, Kafka can’t participate in distributed transaction protocols like XA). While everything looks fine on the surface, you can end up with an inconsistent state in case of failures. The database transaction could get committed, but sending out the notification via Kafka fails. Or, the other way around: the customer service gets notified, but the local database transaction gets rolled back.

While you can find this kind of implementation in many applications, always remember: “Friends don’t let friends do dual writes”! A solution to this problem is the outbox pattern: instead of trying to update two resources at once (a database and Kafka), you only update a single one—the service’s database. When updating the shipment state in the database, you also write the message to be sent to an outbox table; this happens as part of one shared transaction, i.e., applying the atomicity guarantees you get from ACID transactions. Either the shipment state update and the outbox message get persisted, or none of them do. You then use change data capture to retrieve any inserts from the outbox in the database and propagate them to consumers.

More information about the outbox pattern can be found in this blog post on the Debezium blog. Another resource is this article on InfoQ which discusses how the outbox pattern can be used as the foundation for implementing Sagas between multiple services. In the following, I’d like to dive into one particular implementation approach for the pattern. Instead of inserting outbox events in a dedicated outbox table, the idea is to emit them just as logical decoding messages to the WAL.

There are pros and cons to either approach. What makes the route via logical decoding messages compelling is that it avoids any housekeeping needs. Unlike with an outbox table, there’s no need to remove messages after they have been consumed from the transaction log. Also, this emphasizes the nature of an outbox being an append-only medium: messages must never be modified after being added to the outbox, which might happen by accident with a table-based approach.

Regarding the content of outbox messages, you have full flexibility there in general. Sticking to the e-commerce domain from above, it could, for instance, describe a shipment serialized as JSON, Apache Avro, Google Protocol Buffers, or any other format you choose. What’s important to keep in mind is that while the message content doesn’t adhere to any specific table schema from a database perspective, it’s subject to an (ideally explicit) contract between the sending application and any message consumers. In particular, the schema of any emitted events should only be modified if you keep in mind the impact on consumers and backward compatibility.

One commonly used approach is to look at the design of outbox events and their schemas from a domain-driven design perspective. Specifically, Debezium recommends that your messages have the following attributes:

  • id: a unique message id, e.g., a UUID, which consumers can use for deduplication purposes
  • aggregate type: describes the kind of aggregate an event is about, e.g., “customer,” “shipment,” or “purchase order”; when propagating outbox events via Kafka or other streaming platforms, this can be used for sending events of one aggregate type to a specific topic
  • aggregate id: the id of the aggregate an event is about, e.g., a customer or order id; this can be used as the record key in Kafka, thus ensuring all events pertaining to one aggregate will go to the same topic partition and making sure consumers receive these events in the correct order
  • payload: the actual message payload; unlike “raw” table-level CDC events, this can be a rich structure, representing an entire aggregate and all its parts, which in the database itself may spread across multiple tables
Figure 1: Routing outbox events from the transaction log to different Kafka topics.

Enough of the theory—let’s see how a database transaction could look, which emits a logical decoding message with an outbox event. In the accompanying GitHub repository, you can find a Docker Compose file for spinning up all the required components and detailed instructions for running the complete example yourself. Emit an outbox message like this:

postgresuser@postgres:demodb> SELECT * FROM pg_logical_emit_message(
  true,
  'outbox',
  '{
    "id" : "298c2cc3-71bb-4d2b-b5b4-1b14006d56e6",
    "aggregate_type" : "shipment",
    "aggregate_id" : 42,
    "payload" : {
      "customer_id" : 7398,
      "item_id" : 8123,
      "status" : "SHIPPED",
      "numberOfPackages" : 3,
      "address" : "Bob Summers, 12 Main St., 90210, Los Angeles/CA, US"
    }
  }'
 );
 

This creates a transactional message (i.e., it would not be emitted if the transaction aborts, e.g., because of a constraint violation of another record inserted in the same transaction). It uses the “outbox” prefix (allowing it to distinguish it from messages of other types) and contains a JSON message as the actual payload.

Regarding retrieving change events and propagating them to Kafka, the details depend on how exactly Debezium, as the underlying CDC tool, is deployed. When used with Kafka Connect, Debezium provides a single message transform (SMT) that supports outbox tables and, for instance, routes outbox events to different topics in Kafka based on a configurable column containing the aggregate type. However, this SMT doesn’t yet support using logical decoding messages as the outbox format.

When using Debezium via Flink CDC, you could implement a similar logic using a custom <span class="inline-code">KafkaRecordSerializationSchema</span> which routes outbox events to the right Kafka topic and propagates the aggregate id to the Kafka message key, thus ensuring correct ordering semantics. A basic implementation of this could look like this (you can find the complete source code, including the usage of this serializer in a Flink job here):

public class OutboxSerializer implements KafkaRecordSerializationSchema {

  private static final long serialVersionUID = 1L;

  private ObjectMapper mapper;

  @Override
  public ProducerRecord serialize(ChangeEvent element, 
      KafkaSinkContext context, Long timestamp) {
    try {
      JsonNode content = element.getMessage().getContent();


      ProducerRecord record =
          new ProducerRecord(
        content.get("aggregate_type").asText(),
        content.get("aggregate_id").asText().getBytes(Charsets.UTF_8),
        mapper.writeValueAsBytes(content.get("payload"))
      );

      record.headers().add("message_id",      
          content.get("id").asText().getBytes(Charsets.UTF_8));

      return record;
    }
    catch (JsonProcessingException e) {
      throw new IllegalArgumentException(
          "Couldn't serialize outbox message", e);
    }
  }

  @Override
  public void open(InitializationContext context,
    KafkaSinkContext sinkContext) throws Exception {


    mapper = new ObjectMapper();
    SimpleModule module = new SimpleModule();
    module.addDeserializer(Message.class, new MessageDeserializer());
        mapper.registerModule(module);
  }
}

With that Flink job in place, you’ll be able to examine the outbox message on the “shipment” Kafka topic like so:

docker run --tty --rm \
  --network logical-decoding-network \
  quay.io/debezium/tooling:1.2 \
  kcat -b kafka:9092 -C -o beginning -q -t shipment \
  -f '%k -- %h -- %s\n'

42 -- message_id=298c2cc3-71bb-4d2b-b5b4-1b14006d56e6 -- {"customer_id":7398,"item_id":8123,"status":"SHIPPED","numberOfPackages":3,"address":"Bob Summers, 12 Main St., 90210, Los Angeles/CA, US"}

The topic name corresponds to the specified aggregate type, i.e., if you were to issue outbox events for other aggregate types, they’d be routed to different topics accordingly. The message key is 42, matching the aggregate id. The unique event id is propagated as a Kafka message header, enabling consumers to implement efficient deduplication by keeping track of the ids they’ve already received and processed and ignoring any potential duplicates they may encounter. Lastly, the payload of the outbox event is propagated as the Kafka message value.

In particular, in larger organizations with a diverse set of event producers and consumers, it makes sense to align on a shared event envelope format, which standardizes common attributes like event timestamp, origin, partitioning key, schema URLs, and others. The CloudEvents specification comes in handy here, especially for defining event types and their schemas. It is an option worth considering to have your applications emit outbox events adhering to the CloudEvents standard.

Logging

While log management of modern applications typically happens through dedicated platforms like Datadog or Splunk, which ingest changes from dedicated APIs or logs in the file system, it sometimes can be convenient to persist log messages in the database of an application. Log libraries such as the widely used log4j 2 provide database-backed appenders for this purpose. These will typically require a second connection for the logger, though, because in case of a rollback of an application transaction itself, you still (and in particular then) want to write out any log messages, helping you with failure analysis.

Non-transactional logical decoding messages can be a nice means of using a single connection and still ensuring that log messages persist, also when a transaction is rolled back. For example, let’s consider the following situation with two transactions, one of which is committed and one rolled back:

Figure 2: Using non-transactional logical decoding messages for logging purposes.

To follow along, run the following sequence of statements in the pgcli shell:

–- Assuming this table: CREATE TABLE data (id INTEGER, value TEXT);

BEGIN;
INSERT INTO data(id, value) VALUES('1', 'foo');
SELECT * FROM pg_logical_emit_message(false, 'log', 'OK');
INSERT INTO data(id, value) VALUES('2', 'bar');
COMMIT;

BEGIN;
INSERT INTO data(id, value) VALUES('3', 'baz');
SELECT * FROM pg_logical_emit_message(false, 'log', 'ERROR');
INSERT INTO data(id, value) VALUES('4', 'qux');
ROLLBACK;

The first transaction inserts two records in a new table, “data” and also emits a logical decoding message. The second transaction applies similar changes but then is rolled back. When retrieving the change events from the replication slot (using the “testing” decoding plug-in as shown above), the following events will be returned:

postgresuser@postgres:demodb> SELECT * FROM pg_logical_slot_peek_changes('demo_slot', NULL, NULL) order by lsn;

+-----------+-------+------------------------------------------------------------+
| lsn       | xid   | data                                                       |
|-----------+-------+------------------------------------------------------------|
| 0/1A483F8 | 768   | BEGIN 768                                                  |
| 0/1A504B8 | 768   | table public.data: INSERT: id[integer]:1 value[text]:'foo' |
| 0/1A50530 | 768   | message: transactional: 0 prefix: log, sz: 2 content:OK    |
| 0/1A50530 | 768   | table public.data: INSERT: id[integer]:2 value[text]:'bar' |
| 0/1A509B8 | 768   | COMMIT 768                                                 |
| 0/1A50A38 | 769   | message: transactional: 0 prefix: log, sz: 5 content:ERROR |
+-----------+-------+------------------------------------------------------------+

As expected, there are two <span class="inline-code">INSERT</span> events and the log message for the first transaction. However, there are no change events for the aborted transaction for the <span class="inline-code">INSERT</span> statements, as it was rolled back. But as the logical decoding message was non-transactional, it still was written to the WAL and can be retrieved. I.e., you actually can have that cake and eat it too!

Audit Logs

In enterprise applications, keeping an audit log of your data is a common requirement, i.e., a complete trail of all the changes done to a database record, such as a purchase order or a customer.

There are multiple possible approaches for building such an audit log; one of them is to copy earlier record versions into a separate history table whenever a data change is made. Arguably, this increases application complexity. Depending on the specific implementation strategy, you might have to deploy triggers for all the tables that should be audited or add libraries such as Hibernate Envers, an extension to the popular Hibernate object-relational mapping tool. In addition, there’s a performance impact, as the audit records are inserted as part of the application’s transactions, thus increasing write latency.

Change data capture is an interesting alternative for building audit logs: extracting data changes from the database transaction log requires no changes to writing applications. A change event stream, with events for all the inserts, updates, and deletes executed for a table—e.g., persisted as a topic in Apache Kafka, whose records are immutable by definition—could be considered a simple form of an audit log. As the CDC process runs asynchronously, there’s no latency impact on writing transactions.

One shortcoming of this approach—at least in its most basic form—is that it doesn’t capture contextual metadata, like the application user making a given change, client information like device configuration or IP address, use case identifiers, etc. Typically, this data is not stored in the business tables of an application and thus isn’t exposed in raw change data events. 

The combination of logical decoding messages and stream processing, with Apache Flink, can provide a solution here. At the beginning of each transaction, the source application writes all the required metadata into a message; in comparison to writing a full history entry for each modified record, this just adds a small overhead on the write path. You can then use a simple Flink job for enriching all the subsequent change events from that same transaction with that metadata. As all change events emitted by Debezium contain the id of the transaction they originate from, including logical decoding messages, correlating the events of one transaction isn’t complicated. The following image shows the general idea:

Figure 3: Enriching data change events with transaction-scoped audit metadata

When it comes to implementing this logic with Apache Flink, you can do this using a rather simple mapping function, specifically by implementing the <span class="inline-code">RichFlatMapFunction</span> interface, which allows you to combine the enrichment functionality and the removal of the original logical decoding messages in a single operator call:

public void flatMap(String value, Collector out)
    throws Exception {

  ChangeEvent changeEvent = mapper.readValue(value, ChangeEvent.class);
  String op = changeEvent.getOp();
  String txId = changeEvent.getSource().get("txId").asText();

  // logical decoding message
  if (op.equals("m")) {
    Message message = changeEvent.getMessage();


    // an audit metadata message -> remember it
    if (message.getPrefix().equals("audit")) {
      localAuditState = new AuditState(txId, message.getContent());
      return;
    }
    else {
      out.collect(value);
    }
  }
  // a data change event -> enrich it with the metadata
  else {
    if (txId != null && localAuditState != null) {
      if (txId.equals(localAuditState.getTxId())) {
        changeEvent.setAuditData(localAuditState.getState());
      }
    else {
      localAuditState = null;
    }
  }

  changeEvent.setTransaction(null);
  out.collect(mapper.writeValueAsString(changeEvent));
}

The logic is as follows:

  • When the incoming event is of type “m” (i.e., a logical decoding message) and it is an audit metadata event, put the content of the event into a Flink value state
  • When the incoming event is of any other type, and we have stored audit state for the event’s transaction before, enrich the event with that state
  • When the transaction id of the incoming event doesn’t match what’s stored in the audit state (e.g., when a transaction was issued without a metadata event at the beginning), clear the state store and propagate the event as is

You can find a simple yet complete Flink job that runs that mapping function against the Flink CDC connector for Postgres in the aforementioned GitHub repository. See the instructions in the README for running that job, triggering some data changes, and observing the enriched change events. As an example, let’s consider the following transaction which first emits a logical decoding message with the transaction metadata (user name and client IP address) and then two <span class="inline-code">INSERT</span> statements:

BEGIN;
SELECT * FROM pg_logical_emit_message(true, 'audit', '{ "user" : "bob@example.com", "client" : "10.0.0.1" }');
INSERT INTO inventory.customer(first_name, last_name, email) VALUES ('Bob', 'Green', 'bob@example.com');
INSERT INTO inventory.address
  (customer_id, type, line_1, line_2, zip_code, city, country)
VALUES
  (currval('inventory.customer_id_seq'), 'Home', '12 Main St.', 'sdf', '90210', 'Los Angeles', 'US');
COMMIT;

The enriched change events, as emitted by Apache Flink, would look like so:

{
  "op" : "c",
  "ts_ms" : 1673434483049,
  "source" : {
    "connector" : "postgresql",
    "snapshot" : false,
    "db" : "demodb",
    "table" : "customer"
    "lsn" : 24023128,
    "txId" : 555,
    ...
  },
  "before" : null,
  "after" : {
    "id" : 1018,
    "first_name" : "Bob",
    "last_name" : "Green",
    "email" : "bobasdf@example.com"
  },
  "auditData" : {
    "user" : "bob@example.com",
    "client" : "10.0.0.1"
  }
}
{
  "op" : "c",
  "ts_ms" : 1673434483050,
  "source" : {
    "connector" : "postgresql",
    "snapshot" : false,
    "db" : "demodb",
    "table" : "address"
    "lsn" : 24023129,
    "txId" : 555,
    ...
  },
  "before" : null,
  "after" : {
    "id" : 10007,
    "customer_id" : 1018,
    "type" : "Home",
    "line_1" : "12 Main St.",
    "line_2" : "sdf",
    "zip_code" : "90210",
    "city" : "Los Angeles",
    "country" : "US"
  },
  "auditData" : {
    "user" : "bob@example.com",
    "client" : "10.0.0.1"
  }
}

Within the same Flink job, you now could add a sink connector and for instance write the enriched events into a Kafka topic. Alternatively, depending on your business requirements, it can be a good idea to propagate the change events into a queryable store, for instance, an OLAP store like Apache Pinot or Clickhouse. You could use the same approach for enriching change events with contextual metadata for other purposes too, generally speaking for capturing all kinds of “intent” which isn’t directly persisted in the business tables of your application.

Bonus: Advancing Replication Slots

Finally, let’s discuss a technical use case for logical decoding messages: advancing Postgres replication slots. This can come in handy in certain scenarios, where otherwise large segments of the WAL could be retained by the database, eventually causing the database machine to run out of disk space.

This is because replication slots are always created in the context of a specific database, whereas the WAL is shared between all the databases on the same Postgres host. This means a replication slot set up for a database without any data changes and which, therefore, can’t advance, will retain potentially large chunks of WAL if changes are made to another database on the same host.

To experience this situation, stop the currently running Docker Compose set-up and launch this alternative Compose file from the example project:

docker compose -f docker-compose-multi-db.yml up

This spins up a Postgres database container with two databases, DB1 and DB2. Then launch the AdvanceSlotMain class. You can do so via Maven (note this is just for demonstration and development purposes; usually, you’d package up your Flink job as a JAR and deploy it to a running Flink cluster):

mvn exec:exec@advanceslot

It runs a simple Flink pipeline that retrieves all changes from the DB2 database and prints them out on the console. Now, do some changes on the DB1 database:

docker run --tty --rm -i \
  --network logical-decoding-network \
  quay.io/debezium/tooling:1.2 \
  bash -c 'pgcli postgresql://postgresuser:postgrespw@order-db:5432/db1'

postgresuser@order-db:db1> CREATE TABLE data (id INTEGER, value TEXT);
postgresuser@order-db:db1> INSERT INTO data SELECT generate_series(1,1000) AS id, md5(random()::text) AS value;

Query the status of the replication slot (“flink”, set up for database “DB2”), and as you keep running more inserts in DB1, you’ll see that the retained WAL of that slot continuously grows, as long as there are no changes done over in DB2:

postgresuser@order-db:db1> SELECT
  slot_name,
  database,
  pg_size_pretty(
    pg_wal_lsn_diff(
      pg_current_wal_lsn(), restart_lsn)) AS retained_wal,
  active,
  restart_lsn, confirmed_flush_lsn FROM pg_replication_slots;
+-----------------+------------+----------------+----------+---------------+-----------------------+
| slot_name       | database   | retained_wal   | active   | restart_lsn   | confirmed_flush_lsn   |
|-----------------+------------+----------------+----------+---------------+-----------------------|
| flink           | db2        | 526 kB         | True     | 0/22BA030     | 0/22BA030             |
+-----------------+------------+----------------+----------+---------------+-----------------------+

The problem is that as long as there are no changes in the DB2 database, the CDC connector of the running Flink job will never be invoked and thus never have a chance to acknowledge the latest processed LSN of its replication slot. Now, let’s use pg_logical_emit_message() to fix this situation. Get another Postgres shell, this time for DB2, and emit a message like so:

docker run --tty --rm -i \
  --network logical-decoding-network \
  quay.io/debezium/tooling:1.2 \
  bash -c 'pgcli postgresql://postgresuser:postgrespw@order-db:5432/db2'

postgresuser@order-db:db2> SELECT pg_logical_emit_message(false, 'heartbeat', now()::varchar);

In the console output of AdvanceSlotMain you should see the change event emitted by the Debezium connector for that message. With the next checkpoint issued by Flink (look for “Completed checkpoint XYZ for job …” messages in the log), the LSN of that event will also be flushed to the database, essentially allowing the database to discard any WAL segments before that. If you now examine the replication slot again, you should find that the “retained WAL” value is much lower than before (as this process is asynchronous, it may take a bit until the disk space is freed up).

Wrapping Up

Logical decoding messages are not widely known yet very powerful tools, which should be in the box for every software engineer working with Postgres. As you’ve seen, the ability to emit messages into the write-ahead log without them ever surfacing in any actual table allows for a number of interesting use cases, such as reliable data exchange between microservices (thus avoiding unsafe dual writes), application logging, or providing metadata for building audit logs. Employing stateful stream processing using Apache Flink, you can enrich and route your captured messages as well as apply other operations on your data change events, such as filtering, joining, windowed aggregations, and more.

Where there is great power, there are also great responsibilities. As logical decoding messages don’t have an explicit schema, unlike your database tables, the application developer must define sensible contracts and carefully evolve them, always keeping backward compatibility in mind. The CloudEvents format can be a useful foundation for your custom message schemas, providing all the producers and consumers in an organization with a consistent message structure and well-defined semantics.

If you’d like to get started with your explorations around logical decoding messages, look at the GitHub repo accompanying this article, which contains the source code of all the examples shown above and detailed instructions for running them.

Many thanks to Hans-Peter Grahsl, Robert Metzger, and Srini Penchikala for their feedback while writing this article.

Additional Resources

📫 Email signup 👇

Did you enjoy this issue of Checkpoint Chronicle? Would you like the next edition delivered directly to your email to read from the comfort of your own home?

Simply enter your email address here and we'll send you the next issue as soon as it's published—and nothing else, we promise!

Gunnar Morling

Gunnar is an open-source enthusiast at heart, currently working on Apache Flink-based stream processing. In his prior role as a software engineer at Red Hat, he led the Debezium project, a distributed platform for change data capture. He is a Java Champion and has founded multiple open source projects such as JfrUnit, kcctl, and MapStruct.