Debezium PostgreSQL CDC Source Connector

Type

source

Class

io.debezium.connector.postgresql.PostgresConnector

Target System

PostgreSQL

Maintainer

Debezium / Red Hat

License

Apache License 2.0

Project

github.com/debezium/debezium

Download

Debezium Releases

This page documents version 3.0.8.Final. Newer versions should be compatible unless there are breaking changes, but field names or default values may differ. If you notice discrepancies, please contact Axual Support.

Description

The Debezium PostgreSQL CDC Source Connector captures change events from PostgreSQL databases using logical replication and publishes them as records to Kafka topics.

It is maintained by the Debezium community as part of the open-source github.com/debezium/debezium.

The connector uses PostgreSQL logical replication (wal_level=logical) to tail the write-ahead log, streaming every committed change to Kafka with no additional load on the source database. If you only need periodic data snapshots rather than real-time events, use the JDBC Source Connector — PostgreSQL instead.

Features

  • Real-time Change Data Capture (CDC) from PostgreSQL tables using logical replication (wal_level=logical)

  • Captures inserts, updates, and deletes

  • Stream name follows the convention <nickname>.<database>.<table>

  • Works with Google Cloud SQL PostgreSQL

When to Use

  • You need real-time CDC from PostgreSQL tables into Kafka.

  • You want to capture all DML events with millisecond latency.

When NOT to Use

  • You only need periodic polling — use the JDBC Source Connector — PostgreSQL instead.

  • PostgreSQL logical replication (wal_level=logical) is not enabled on your database server.

Installation

The connector is available from the Debezium Releases.

  1. Navigate to the releases page and select the version matching your Kafka Connect installation.

  2. Download the JAR file.

For installation steps, see Installing Connector Plugins.

Configuration

For the complete configuration reference, see the official source connector documentation.

To configure a connector in Axual Self-Service, see Starting Connectors.
To speed up your deployment, use the Terraform Boilerplate or the Management API Boilerplate.

Axual Platform requirements

When running this connector on the Axual Platform, supply the following additional properties to disable automatic topic creation. The connector has a bug that requires the topic.creation.default.* workaround properties even when topic.creation.enable is set to false.

topic.creation.enable

false

topic.creation.default.partitions

1

topic.creation.default.replication.factor

3

Getting Started

Prerequisites

PostgreSQL instance

We’ll deploy a PostgreSQL instance using Google Cloud Services.
If you have a Google account, you can sign up for a free trial of Google cloud.

You can read more about PostgreSQL on Google Cloud here.

  1. Create a new PostgreSQL instance.

    • instance ID: my-axual-postgresql-connector-test
      The instance ID is irrelevant for our example: It will only be displayed in your Google overview.

    • Password: You can set the password for the postgres user of the database. We’ll use this password when configuring the Connector-Application later.
      You can reset this password later.

    • Database: Select "PostgreSQL 14" (likely the default option).

    • Choose a configuration to start with: Development

    • Region: europe-west1
      The region isn’t very irrelevant for our example, but usually you would select a region geographically closest to the Connect-Cluster.

    • Zonal availability: Single zone

    • Customize your instance: Click "Show configuration options"

      • Machine type: Click the dropdown menu and select Lightweight. 1vCPU is enough.

      • Storage: go for the least amount of storage.

        • If available, Use HDDs.

        • If given the option, do not enable automatic storage increases.

      • Connections: Leave only "Public IP" selected.

        • Authorized networks. Click "Add network". Use any name and 0.0.0.0/0 as the CIDR notation (and click "Done").
          This will open the database to the entire internet. That’s ok, we’ll delete it shortly anyway.

      • Authentication: Skip this config if it’s present

      • Data protection:

        • Enable backups. Under "Advanced options", select (single) Region (europe-west1)

        • If available, enable point-in-time recovery

        • Disable deletion protection

      • Maintenance: Skip this config

      • Flags and parameters: cloudsql.logical_decoding : on
        The effect of this operation is to set the "wal_level" to "logical"

      • Labels: Skip this config

    • Click "Create instance".

  2. While the database server is getting deployed, let’s create a bucket.

    • Name your bucket: postgresql-init-axual-connect (or something different, in case the name is already taken).
      Skip labels.
      Click "Continue".
      The bucket name is irrelevant for our example: It will only be displayed in your Google overview.

    • Choose where to store your data:

      • Region: europe-west1 (you don’t need Multi or dual region. Single region is enough)
        The region isn’t very relevant for our example, but usually you would select a region geographically closest to the Connect-Cluster.

    • Choose a default storage class for your data: Standard.
      Click "Continue".

    • Choose how to control access to objects: Uniform.
      Click "Continue".

    • Choose how to protect object data: None

      • Data encryption: Skip this config

  3. Click "Create" to create the bucket.
    If you get a pop-up with "Public access will be prevented", click "Confirm".

  4. The bucket page will open. Click "Upload files"

  5. Save the following text to a file on your system, and then click "Upload files" to upload it into the bucket:

    Create the test table and seed data:

    ALTER USER postgres REPLICATION;
    
    CREATE TABLE CUSTOMERS
    (
      user_id        serial         PRIMARY KEY,
      customer_name  VARCHAR (100)  NOT NULL,
      email          VARCHAR(250)
    );
    
    INSERT INTO CUSTOMERS (customer_name, email) VALUES
    ('alin', null),
    ('frenk', null),
    ('doxxed', null);
  6. You can close the buckets page. Let’s go back to our SQL instance.
    Select your instance to open it. Note down the public IP address.
    We’ll use it when configuring the Connector-Application later.

  7. [Optional] Change the postgres user’s password:

    • On the left-side menu, click "Users".

    • Click the dots next to the postgres user.

    • Click "Change password".

    • Type a new password and click "OK".

  8. On the left-side menu, click "Databases".

    • Click "Create Database".
      Use inventory as the name, since this is the name used in this example instance-setup (we referenced it in the SQL snippet above, and we’ll do it again in the connector configuration).

      • Click "Create".

  9. On the left-side menu, click "Overview". Click the "Import" button.

    • Source: click "Browse".

      • Double-click your bucket and select the SQL file we saved earlier.

      • Click "SELECT" at the bottom of the page.

    • File format: SQL

    • Destination: inventory

    • Click "Import"

Axual stream

Create and deploy the following stream:

  • postgresql_nickname.inventory.customers — Contains DB events for the customers table.
    Stream name follows the pattern <nickname>.<database>.<table>. Create one per watched table.

Use Key/Value type String/String.

See Creating streams if you need to create streams.

Steps

Step 1 — Create a connector application

  1. Follow the Creating streams documentation in order to create one stream and deploy it onto an environment.
    The name of the stream will be postgresql_nickname.inventory.customers.
    The key/value types will be String/String.

    This stream name is the composition of <nickname>.<database name>.<table name>. This naming pattern is enforced by the connector. Create one such stream for every table you intend to watch.

  2. Follow the Configure and install a connector documentation to set up a new Connector-Application.
    Let’s call it my_customers_app.
    The plugin name is io.debezium.connector.postgresql.PostgresConnector.
    If a plugin isn’t available, ask a platform operator to install plugins.

Step 2 — Configure the connector

  1. Provide the following minimal configuration:

    database.hostname

    Example value:
    123.123.123.123

    database.port

    5432

    database.user

    postgres

    database.password

    Example value:
    1234abcdEFG

    topic.prefix

    postgresql_nickname

    database.dbname

    inventory

    table.include.list

    public.customers

    Also supply the Axual Platform-required properties described in [_axual_platform_requirements].

  2. Authorize the my_customers_app source Connector-Application to produce to the postgresql_nickname.inventory.customers stream.

Step 3 — Start the connector

Start the connector application from Axual Self-Service.

Step 4 — Verify

In Axual Self-Service, use stream-browse on postgresql_nickname.inventory.customers to confirm CDC events are arriving when rows are inserted or updated in PostgreSQL.

Cleanup

When you are done:

  1. Stop the connector application in Axual Self-Service.

  2. Remove stream access for the application if no longer needed.

  3. Delete your Cloud Storage bucket and your SQL instance if they were created only for testing.

Known limitations

  • PostgreSQL logical replication (wal_level=logical) must be enabled on the server.
    For Google Cloud SQL, set the cloudsql.logical_decoding flag to on.

  • topic.creation.enable=false must be set explicitly along with the topic.creation.default.* workaround properties due to a connector bug.

Examples

Full configuration on Axual Platform

Derived from the Terraform Boilerplate.

{
  "name": "my-debezium-postgresql-source",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "tasks.max": "1",
    "database.hostname": "postgres.example.com",
    "database.port": "5432",
    "database.user": "debezium_user",
    "database.password": "Deb3zium@PgPass",
    "database.dbname": "inventory",
    "slot.name": "debezium_slot",
    "publication.name": "debezium_publication",
    "publication.autocreate.mode": "disabled",
    "plugin.name": "pgoutput",
    "table.include.list": "public.customers",
    "topic.prefix": "postgresql_nickname",
    "topic.creation.enable": "false",
    "topic.creation.default.partitions": "1",
    "topic.creation.default.replication.factor": "3",
    "snapshot.mode": "initial",
    "poll.interval.ms": "5000",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "false",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",
    "errors.log.enable": "true",
    "errors.log.include.messages": "true",
    "errors.tolerance": "none"
  }
}
The publication (debezium_publication) must be created manually in PostgreSQL before starting the connector:
CREATE PUBLICATION debezium_publication FOR TABLE public.customers;

License

Debezium source connector is licensed under the Apache License, Version 2.0.