Change data capture in Postgres: How to use logical decoding and wal2json

This post has been republished via RSS; it originally appeared at: New blog articles in Microsoft Tech Community.

Databases don’t exist in isolation. Databases live in an ecosystem of software components: caches, search, dashboards, analytics, other databases, data lakes, web apps, and more. Your Postgres database partners with all these components to deliver the unique value of your application.

 

How does your ecosystem stay in sync? How do those other components get informed about changes in Postgres? Change data capture or CDC refers to any solution that identifies new or modified data.

 

One solution: Maybe you add a timestamp column to your PostgreSQL tables to record change times. Periodically you run a query to pull all the new data—that timestamp column will help you to identify what’s new since your last pull. It can be a workable solution if you’re okay with that schema change and batched updates—only identifying what’s new on some periodic basis. You have to determine an appropriate batching frequency. Too often may tie up CPU; too infrequent and you’ll fall behind on updates.

 

If what you want is row by row streaming of Postgres data changes as they happen, you’ll need a different solution

 

Real-time data updates allow you to keep disparate data systems in continuous sync and to respond quickly to new information. Like being able to show an online shopper recommendations that are based on items they’ve placed in their cart so far. Or, for a bank, being able to send customers a notification when there’s an unusual transaction on their credit card. Not to mention being able to use the customer’s response to adjust whether similar subsequent payments go through.

 

That’s where transaction log-based change data capture comes in. The transaction log naturally keeps track of each data change as it happens. You just need a way to read the log.

 

Logical decoding is the official name of PostgreSQL’s log-based change data capture feature. If the term logical decoding sounds unfamiliar, you may have heard of wal2json instead. Wal2json is a popular output plugin for logical decoding. People often use ‘wal2json’ to refer to ‘wal2json + logical decoding’. (FYI, Azure Database for PostgreSQL, our managed database service for Postgres, supports logical decoding and wal2json).

 

Let’s dive into this powerful Postgres feature.

 

 

Decoding “logical decoding” in Postgres

 

Question: What is “logical decoding” decoding?

Answer: The WAL – PostgreSQL’s write ahead log. 

 

The WAL (or transaction log) keeps track of all committed data transactions: It is the authority on everything that has happened on your Postgres instance. Its primary purpose is to help your Postgres database recover its state in the event of a crash. The WAL is written in a special format that Postgres understands because Postgres is its first and primary customer.

 

But such a log of all database changes is a handy way for other interested parties to understand what has happened on the database. And not only what happened, but in what order. 

 

So, there’s this treasure trove of data changes in Postgres (the WAL) but it’s written in a format your non-Postgres services won’t understand. Enter logical decoding: the way Postgres enables you to translate (decode!) and emit the WAL into a form you can use.

 

Here’s what the logical decoding process looks like:

Postgres logical decoding explained 1200.png


When a row is changed in a Postgres table, that change is recorded in the WAL. If logical decoding is enabled, the record of that change is passed to the output plugin. The output plugin changes that record from the WAL format to the plugin’s format (e.g. a JSON object). Then the reformatted change exits Postgres via a replication slot. Finally, there’s the consumer. A consumer is any application of your choice that connects to Postgres and receives the logical decoding output.

 

 

Logical decoding in action

 

To stream logical decoding, three Postgres parameters need to be set in postgresql.conf:

 

wal_level = logical max_replication_slots = 10 max_wal_senders = 10

 

Setting wal_level to logical allows the WAL to record information needed for logical decoding. max_replication_slots and max_wal_senders must be at least 1 or higher if your server may be using more replication connections.

 

Then your Postgres server needs to be restarted to apply the changes.

 

I’m using an Azure Database for PostgreSQL server. In Azure, instead of individually setting those three parameters, you can set azure.replication_support:

 

az postgres server configuration set --resource-group rgroup --server-name pgserver --name azure.replication_support --value logical

 

and restart the server:

 

az postgres server restart --resource-group rgroup --name pgserver

 

 

For our example we’ll be using wal2json as the output plugin. WAL to JSON. As you might have guessed, this output plugin converts the Postgres write-ahead log output into JSON objects. wal2json is an open source project that you can download and install to your local Postgres setup. It is already installed on Azure Database for Postgres servers.

 

We need a slot and a consumer. pg_recvlogical is a Postgres application that can manage slots and consume the stream from them. pg_recvlogical is included in the Postgres distribution. That means if you’ve ever installed Postgres on your laptop, you probably have pg_recvlogical.

 

To confirm you have pg_recvlogical, you can open a terminal and run

 

pg_recvlogical --version

 

Now connect to your Postgres database with pg_recvlogical and create a slot. Connect with a user that has replication permissions.

 

pg_recvlogical -h pgserver.postgres.database.azure.com -U rachel@pgserver -d postgres --slot logical_slot --create-slot -P wal2json

 

Then start the slot.

 

pg_recvlogical -h pgserver.postgres.database.azure.com -U rachel@pgserver -d postgres --slot logical_slot --start -o pretty-print=1 -f –

 

This terminal won’t return control to you – it is waiting to receive the logical decoding stream.

 

So we should give the slot something to stream by making data changes in Postgres. To do that, connect to the Postgres database like you normally do. I’m using psql (in a different terminal). Create a table and modify some rows. 

 

CREATE TABLE inventory (id SERIAL, item VARCHAR(30), qty INT, PRIMARY KEY(id)); INSERT INTO inventory (item, qty) VALUES ('apples', '100'); UPDATE inventory SET qty = 96 WHERE item = 'apples'; DELETE FROM inventory WHERE item = 'apples';

 

 

You can see the logical decoding output in the pg_recvlogical terminal. The format of the output is determined by wal2json, the output plugin we selected.

 

{         "change": [         ] } {         "change": [                 {                         "kind": "insert",                         "schema": "public",                         "table": "inventory",                         "columnnames": ["id", "item", "qty"],                         "columntypes": ["integer", "character varying(30)", "integer"],                         "columnvalues": [1, "apples", 100]                 }         ] } {         "change": [                 {                         "kind": "update",                         "schema": "public",                         "table": "inventory",                         "columnnames": ["id", "item", "qty"],                         "columntypes": ["integer", "character varying(30)", "integer"],                         "columnvalues": [1, "apples", 96],                         "oldkeys": {                                 "keynames": ["id"],                                 "keytypes": ["integer"],                                 "keyvalues": [1]                         }                 }         ] } {         "change": [                 {                         "kind": "delete",                         "schema": "public",                         "table": "inventory",                         "oldkeys": {                                 "keynames": ["id"],                                 "keytypes": ["integer"],                                 "keyvalues": [1]                         }                 }         ] }

 

 

To drop the replication slot,

 

pg_recvlogical -h pgserver.postgres.database.azure.com -U rachel@pgserver -d postgres --slot logical_slot --drop-slot

 

 

What data is captured by logical decoding?

 

Logical decoding can only output information about DML (data manipulation) events in Postgres, that is INSERT, UPDATE, and DELETE. DDL (data definition) changes like CREATE TABLE, ALTER ROLE, and DROP INDEX are not emitted by logical decoding. And neither is any command that's not an INSERT, UPDATE, or DELETE. Remember when we ran CREATE TABLE in our logical decoding test above? The output was blank:

 

{         "change": [         ] }

 

 

For INSERT and UPDATE, a new row is added to a table. This new row data is always sent to the output plugin.

 

For UPDATE and DELETE, a row is removed from a table. Whether that old row’s information gets sent to the output plugin depends on a Postgres table property called REPLICA IDENTITY. By default, only the primary key values of the old row are sent from the WAL.

 

As an example, let’s look at the update we did earlier.   

 

UPDATE inventory SET qty = 96 WHERE item = 'apples';

 

The logical decoding output was:

 

{         "change": [                 {                         "kind": "update",                         "schema": "public",                         "table": "inventory",                         "columnnames": ["id", "item", "qty"],                         "columntypes": ["integer", "character varying(30)", "integer"],                         "columnvalues": [1, "apples", 96],                         "oldkeys": {                                 "keynames": ["id"],                                 "keytypes": ["integer"],                                 "keyvalues": [1]                         }                 }         ] }

 

 

You can see that we get all the new row’s data, [1, "apples", 96]. For old data, we only get the primary key column, id.

 

REPLICA IDENTITY has other settings that vary the information you can get about updated and deleted rows.  

 

 

Wal2json is one output plugin. There are other options

 

We’ve already talked about one popular output plugin: wal2json. There are other output plugins to choose from. If you are self-hosting Postgres, you could even make your own.

 

Two output plugins ship natively with Postgres (no additional installation needed):

  • test_decoding: Available on Postgres 9.4+. Though created to be just an example of an output plugin, test_decoding is still useful if your consumer supports it (e.g. Qlik replicate).

  • pgoutput: Available since Postgres 10. pgoutput is used by Postgres to support logical replication, and is supported by some consumers for decoding (e.g. Debezium).

 

An output plugins receives data from the WAL. The plugin then decides what information to keep and how to present that information to you.

 

For example, new row data is always sent from the WAL to the output plugin. However, wal2json chooses not to output new row data for an UPDATE if the table has no primary key. test_decoding, on the other hand, will publish that row. But test_decoding is not JSON formatted with name/value pairs. Pick the output plugin that suits your scenario.

 

 

Six important things to know about slots

 

Logical decoding outputs data changes as a stream. That stream is called a logical replication slot.

 

You should keep in mind the following when dealing with slots:

 

  1. Each slot has one output plugin (you choose which).

  2. Each slot provides changes from only one database.

 

  1. But a single database can have multiple slots.

  2. Each data change is normally emitted once per slot.

  3. However, if the Postgres instance restarts, a slot may re-emit changes. Your consumer needs to handle that situation.

  4. An unconsumed slot is a threat to your Postgres instance’s availability.
    Strong words, yes, but it’s true. It is critical that you monitor your slots. If a slot’s stream isn’t being consumed, Postgres will hold on to all the WAL files for those unconsumed changes. This can lead to storage full or transaction ID wraparound.

 

Postgres has a table called pg_replication_slots that tracks the state of all replication slots. Keep your eye on the ‘active’ column. If a slot does not have a connection to a consumer, the column will be false.

pg_replication_slots table - monitoring slots.png

 

Another way to watch for the impact of an inactive slot is to have storage alerts configured. If storage is growing and you don’t know why, it may be due to WAL file retention by an unconsumed slot.

 

Finally, the consumer you choose may come with built-in monitoring so look for that feature.

 

You are better off deleting an unused slot than keeping it around.

 

 

With local decoding, Postgres gives you the WAL data. Now what?

 

Remember that a consumer is any application that can connect to Postgres and ingest the logical decoding stream. We used pg_recvlogical as the consumer in our example earlier.

 

You can create your own consumer, an app that parses and redirects the Postgres logical decoding stream to other components in your system. For example, in this PGConf.EU presentation Webedia uses a custom service called walparser to convert wal2json’s output into MQ messages, then sends the messages to RabbitMQ and ElasticSearch. Another example is Netflix’s DBLog.

 

Or, instead of making your own consumer, you can let someone else do the heavy lifting. There are change data capture connectors available that support Postgres logical decoding as a source and provide connections to various targets.

 

If you are looking for an open-source offering, Debezium is a popular change data capture solution built on Apache Kafka. Learn more about Debezium from their FAQ or this deep dive into change data capture patterns.

 

You could also explore paid services like Striim and Qlik Replicate. One advantage of all three consumers, compared to creating your own solution, is they support a variety of other sources like MySQL, Oracle, and SQL Server. If you have data in other database engines, you can use their connectors to integrate data instead of building a custom connector for each one.

 

Dynamic real-time responses are the hallmark of modern applications

 

Logical decoding in PostgreSQL provides an efficient way for your other app components to stay up-to-date with data changes in your Postgres database. Write once to the reliable Postgres log, then derive those change events for downstream targets like caches and search indexes. Instead of a pull model where each component queries Postgres at some interval, this is a push model where Postgres notifies you and your application of each change, as it happens. With logical decoding your Postgres database becomes a centerpiece of your dynamic real-time application.

REMEMBER: these articles are REPUBLISHED. Your best bet to get a reply is to follow the link at the top of the post to the ORIGINAL post! BUT you're more than welcome to start discussions here:

This site uses Akismet to reduce spam. Learn how your comment data is processed.