Debezium MySQL CDC Source Connector

Type

source

Class

io.debezium.connector.mysql.MySqlConnector

Target System

MySQL

Maintainer

Debezium / Red Hat

License

Apache License 2.0

Project

github.com/debezium/debezium

Download

Debezium Releases

This page documents version 3.0.8.Final. Newer versions should be compatible unless there are breaking changes, but field names or default values may differ. If you notice discrepancies, please contact Axual Support.

Description

The Debezium MySQL CDC Source Connector captures change events from MySQL databases using the MySQL binlog and publishes them as records to Kafka topics.

It is maintained by the Debezium community as part of the open-source github.com/debezium/debezium.

The connector reads the MySQL binary log (binlog), which records every committed change. It connects to MySQL as a replica and tails the binlog stream — meaning no additional load is placed on the source database. Schema history is stored in a dedicated Kafka topic so events are decoded correctly even after schema changes.

Features

  • Real-time Change Data Capture (CDC) from MySQL tables using binlog replication

  • Captures inserts, updates, and deletes

  • Schema history stored in a dedicated Kafka topic

  • Axual-specific SSL key material managed via keyvault references

  • Stream name follows the convention <nickname>.<database>.<table>

When to Use

  • You need real-time CDC from MySQL tables into Kafka.

  • You want to capture all DML events (inserts, updates, deletes) with millisecond latency.

When NOT to Use

Installation

The connector is available from the Debezium Releases.

  1. Navigate to the releases page and select the version matching your Kafka Connect installation.

  2. Download the JAR file.

For installation steps, see Installing Connector Plugins.

Configuration

For the complete configuration reference, see the official source connector documentation.

To configure a connector in Axual Self-Service, see Starting Connectors.
To speed up your deployment, use the Terraform Boilerplate or the Management API Boilerplate.

Axual Platform requirements

The Debezium MySQL connector maintains a schema history by writing to and reading from a dedicated Kafka topic (schema.history.internal.kafka.topic). This means the connector opens an internal Kafka producer and consumer — separate from the main Connect worker — to persist schema changes. On the Axual Platform, Kafka requires SSL authentication, so this internal connection must also be configured with the SSL credentials of the connector application. The SSL values are read from Axual’s keyvault at runtime using the ${keyvault:…​} reference syntax.

Supply the following additional properties when running this connector on the Axual Platform.

The schema.history.internal.kafka.topic value must be resolved using Axual’s name-resolving mechanism. Consult an Axual platform operator to acquire the name-resolving values (tenant, instance, and environment short names).

auto.create.topics.enable

false

schema.history.internal.consumer.group.id

This consumer-group name must be resolved!

Example resolved value:
axual-local-default-mysql_nickname-dbhistory

schema.history.internal.consumer.security.protocol

SSL

schema.history.internal.consumer.ssl.key.password

${keyvault:connectors/axual/local/default/my_hotels_app:ssl.key.password}

schema.history.internal.consumer.ssl.keystore.location

${keyvault:connectors/axual/local/default/my_hotels_app:ssl.keystore.location}

schema.history.internal.consumer.ssl.keystore.password

${keyvault:connectors/axual/local/default/my_hotels_app:ssl.keystore.password}

schema.history.internal.consumer.ssl.truststore.location

${keyvault:connectors/axual/local/default/my_hotels_app:ssl.truststore.location}

schema.history.internal.consumer.ssl.truststore.password

${keyvault:connectors/axual/local/default/my_hotels_app:ssl.truststore.password}

schema.history.internal.producer.security.protocol

SSL

schema.history.internal.producer.ssl.key.password

${keyvault:connectors/axual/local/default/my_hotels_app:ssl.key.password}

schema.history.internal.producer.ssl.keystore.location

${keyvault:connectors/axual/local/default/my_hotels_app:ssl.keystore.location}

schema.history.internal.producer.ssl.keystore.password

${keyvault:connectors/axual/local/default/my_hotels_app:ssl.keystore.password}

schema.history.internal.producer.ssl.truststore.location

${keyvault:connectors/axual/local/default/my_hotels_app:ssl.truststore.location}

schema.history.internal.producer.ssl.truststore.password

${keyvault:connectors/axual/local/default/my_hotels_app:ssl.truststore.password}

Getting Started

Prerequisites

MySQL instance

We’ll deploy a MySQL instance using Google Cloud Services.
If you have a Google account, you can sign up for a free trial of Google cloud.

You can read more about MySQL on Google Cloud here.

  1. Create a new MySQL instance.

    • instance ID: my-axual-mysql-connector-test
      The instance ID is irrelevant for our example: It will only be displayed in your Google overview.

    • Password: You can set the password for the root user of the database. We’ll use this password when configuring the Connector-Application later.
      You can reset this password later.

    • Database: Select "MySQL 8.0" (likely the default option).

    • Choose a configuration to start with: Development

    • Region: europe-west1
      The region isn’t very irrelevant for our example, but usually you would select a region geographically closest to the Connect-Cluster.

    • Zonal availability: Single zone

    • Customize your instance: Click "Show configuration options"

      • Machine type: Click the dropdown menu and select Lightweight. 1vCPU is enough.

      • Storage: go for the least amount of storage.

        • If available, Use HDDs.

        • If given the option, do not enable automatic storage increases.

      • Connections: Leave only "Public IP" selected.

        • Authorized networks. Click "Add network". Use any name and 0.0.0.0/0 as the CIDR notation (and click "Done").
          This will open the database to the entire internet. That’s ok, we’ll delete it shortly anyway.

      • Authentication: Skip this config if it’s present

      • Data protection:

        • Enable backups. Under "Advanced options", select (single) Region (europe-west1)

        • If available, enable point-in-time recovery

        • Disable deletion protection

      • Maintenance: Skip this config

      • Flags and parameters: Skip this config

      • Labels: Skip this config

    • Click "Create instance".

  2. While the database server is getting deployed, let’s create a bucket.

    • Name your bucket: mysql-init-axual-connect (or something different, in case the name is already taken).
      Skip labels.
      Click "Continue".
      The bucket name is irrelevant for our example: It will only be displayed in your Google overview.

    • Choose where to store your data:

      • Region: europe-west1 (you don’t need Multi or dual region. Single region is enough)
        The region isn’t very relevant for our example, but usually you would select a region geographically closest to the Connect-Cluster.

    • Choose a default storage class for your data: Standard.
      Click "Continue".

    • Choose how to control access to objects: Uniform.
      Click "Continue".

    • Choose how to protect object data: None

      • Data encryption: Skip this config

  3. Click "Create" to create the bucket.
    If you get a pop-up with "Public access will be prevented", click "Confirm".

  4. The bucket page will open. Click "Upload files"

  5. Save the following text to a file on your system, and then click "Upload files" to upload it into the bucket:

    Create the test table and seed data:

    USE mysql_database_name;
    
    CREATE TABLE HOTEL
    (
      hotel_id    INT           NOT NULL AUTO_INCREMENT,
      hotel_name  VARCHAR(100)  NULL,
      PRIMARY KEY (hotel_id)
    );
    
    INSERT INTO `HOTEL` VALUES
    (1, 'iris'),
    (2, 'altair'),
    (3, 'turbulence');
  6. You can close the buckets page. Let’s go back to our SQL instance.
    Select your instance to open it. Note down the public IP address.
    We’ll use it when configuring the Connector-Application later.

  7. [Optional] Change the root user’s password:

    • On the left-side menu, click "Users".

    • Click the dots next to the root user.

    • Click "Change password".

    • Type a new password and click "OK".

  8. On the left-side menu, click "Databases".

    • Click "Create Database".
      Use mysql_database_name as the name, since this is the name used in this example instance-setup (we referenced it in the SQL snippet above, and we’ll do it again in the connector configuration).

      • Click "Create".

  9. On the left-side menu, click "Overview". Click the "Import" button.

    • Source: click "Browse".

      • Double-click your bucket and select the SQL file we saved earlier.

      • Click "SELECT" at the bottom of the page.

    • File format: SQL

    • Destination: mysql_database_name

    • Click "Import"

Axual name-resolving properties

Some properties require Axual’s custom name-resolving mechanism.
Consult an Axual platform operator to acquire the following information:

Tenant shortName

Example value:
axual

Instance shortName

Example value:
local

Environment shortName

Example value:
default

Connect-Application shortName

This value is already provided. We’ll use:
my_hotels_app

Resolving pattern for topic-names

Example pattern:
${tenant-shortName}-${instance-shortName}-${environment-shortName}-${topic-name}
Example resolved topic name:
axual-local-default-transactions

Resolving pattern for consumer-groups

Example pattern:
${tenant-shortName}-${instance-shortName}-${environment-shortName}-${consumerGroup-name}
Example resolved consumer group:
axual-local-default-com.axual.transactionProcessor

Axual streams

Follow the Creating streams documentation to create and deploy the following streams:

  • mysql_nickname — Contains schema-change-events for the captured database.
    You can choose any value for this stream name — it must be unique within the Connect cluster.

  • mysql_nickname.schema-changes — Stores the database schema history.
    _Stream name can be freely chosen; we use <nickname>._schema-changes for consistency.

  • mysql_nickname.mysql_database_name.HOTEL — Contains DB events for the HOTEL table.
    Stream name follows the pattern <nickname>.<database>.<table>. Create one per watched table.

All streams should use Key/Value type String/String, Partition count 1, Retention Time 157680000000 (5 years).

Steps

Step 1 — Create a connector application

  1. Follow the Configure and install a connector documentation to set up a new Connector-Application.
    Let’s call it my_hotels_app.
    The plugin name is io.debezium.connector.mysql.MySqlConnector.
    If a plugin isn’t available, ask a platform operator to install plugins.

    Use the following application ID: mysql_nickname-dbhistory.

Step 2 — Configure the connector

  1. Provide the following configuration:

    database.hostname

    Example value:
    123.123.123.123

    database.port

    3306

    database.user

    root

    database.password

    Example value:
    1234abcdEFG

    topic.prefix

    mysql_nickname

    database.server.id

    184054

    database.include.list

    mysql_database_name

    table.include.list

    Example value:
    mysql_database_name.HOTEL

    schema.history.internal.kafka.bootstrap.servers

    Example values:
    - Local (helm platform): platform.local:31757
    - Axual-Cloud rokin cluster: bootstrap-c1-ams01-azure.axual.cloud:9094
    - SAAS trial: oberon-kafka-bootstrap.axual.cloud:9094

    schema.history.internal.kafka.topic

    This topic name must be resolved! Use the name-resolving values acquired above.

    Example resolved value:
    axual-local-default-mysql_nickname._schema-changes

  2. Supply the Axual Platform-required properties described in [_axual_platform_requirements].

Step 3 — Start the connector

Authorize the my_hotels_app source connector application to produce to and consume from all 3 streams:

  • mysql_nickname

  • mysql_nickname._schema-changes

  • mysql_nickname.mysql_database_name.HOTEL

Then start the connector application from Axual Self-Service.

Step 4 — Verify

In Axual Self-Service, use stream-browse on mysql_nickname.mysql_database_name.HOTEL to confirm CDC events are arriving when rows are inserted or updated in MySQL.

Cleanup

When you are done:

  1. Stop the connector application in Axual Self-Service.

  2. Remove stream access for the application if no longer needed.

  3. Delete your Cloud Storage bucket and your SQL instance if they were created only for testing.

Known limitations

  • MySQL binlog must be enabled on the server.

  • The schema.history.internal.kafka.topic value must be resolved using Axual’s name-resolving mechanism before the connector can be started.

  • SSL configuration values must use the keyvault: prefix format specific to Axual.

Examples

Full configuration on Axual Platform

Derived from the Terraform Boilerplate.

{
  "name": "my-debezium-mysql-source",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",
    "database.hostname": "mysql.example.com",
    "database.port": "3306",
    "database.user": "debezium_user",
    "database.password": "Deb3zium@MysqlPass",
    "database.server.id": "184054",
    "database.include.list": "mysql_database_name",
    "table.include.list": "mysql_database_name.HOTEL",
    "topic.prefix": "mysql_nickname",
    "topic.creation.enable": "false",
    "include.schema.changes": "true",
    "snapshot.mode": "initial",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "false",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",
    "schema.history.internal.kafka.bootstrap.servers": "kafka.axual.example.com:9093",
    "schema.history.internal.kafka.topic": "axual-myinstance-exampleenvironment-schema-history-mysql",
    "schema.history.internal.consumer.group.id": "my-debezium-mysql-app",
    "schema.history.internal.consumer.security.protocol": "SSL",
    "schema.history.internal.consumer.ssl.keystore.location": "${keyvault:connectors/<tenant>/<instance>/<environment>/<application>:ssl.keystore.location}",
    "schema.history.internal.consumer.ssl.keystore.password": "${keyvault:connectors/<tenant>/<instance>/<environment>/<application>:ssl.keystore.password}",
    "schema.history.internal.consumer.ssl.key.password": "${keyvault:connectors/<tenant>/<instance>/<environment>/<application>:ssl.key.password}",
    "schema.history.internal.consumer.ssl.truststore.location": "${keyvault:connectors/<tenant>/<instance>/<environment>/<application>:ssl.truststore.location}",
    "schema.history.internal.consumer.ssl.truststore.password": "${keyvault:connectors/<tenant>/<instance>/<environment>/<application>:ssl.truststore.password}",
    "schema.history.internal.producer.security.protocol": "SSL",
    "schema.history.internal.producer.ssl.keystore.location": "${keyvault:connectors/<tenant>/<instance>/<environment>/<application>:ssl.keystore.location}",
    "schema.history.internal.producer.ssl.keystore.password": "${keyvault:connectors/<tenant>/<instance>/<environment>/<application>:ssl.keystore.password}",
    "schema.history.internal.producer.ssl.key.password": "${keyvault:connectors/<tenant>/<instance>/<environment>/<application>:ssl.key.password}",
    "schema.history.internal.producer.ssl.truststore.location": "${keyvault:connectors/<tenant>/<instance>/<environment>/<application>:ssl.truststore.location}",
    "schema.history.internal.producer.ssl.truststore.password": "${keyvault:connectors/<tenant>/<instance>/<environment>/<application>:ssl.truststore.password}",
    "errors.log.enable": "true",
    "errors.log.include.messages": "true",
    "errors.tolerance": "none"
  }
}

License

Debezium source connector is licensed under the Apache License, Version 2.0.