JDBC Source Connector — MySQL

Type

Source

Class

io.aiven.connect.jdbc.JdbcSourceConnector

Target System

Database (MySQL)

Maintainer

Aiven

License

Apache License 2.0

Project

github.com/aiven/jdbc-connector-for-apache-kafka

Download

Maven Central

This page documents version 6.12.0. Newer versions should be compatible unless there are breaking changes, but field names or default values may differ. If you notice discrepancies, please contact Axual Support.

Description

The JDBC Source Connector for MySQL periodically polls MySQL tables and publishes new or changed rows as records to Kafka topics.

It is maintained by Aiven as part of the open-source github.com/aiven/jdbc-connector-for-apache-kafka.

Features

  • Poll MySQL tables and publish rows to Kafka topics

  • Multiple polling modes: incrementing, timestamp, timestamp+incrementing, and bulk

  • Configurable table whitelist and topic prefix for flexible topic naming

  • Schema-aware: works with JSON (with schema) and Avro converters

  • Offset tracking ensures rows are not re-published across connector restarts

When to Use

  • You need to periodically ingest rows from a MySQL table into Kafka.

  • Your use case tolerates polling latency — near-real-time is sufficient and millisecond CDC latency is not required.

  • You want a simple database integration without the operational overhead of CDC.

When NOT to Use

  • You need real-time change data capture — use Debezium MySQL instead.

  • Your database schema changes frequently — this connector does not support schema evolution and requires manual reconfiguration on schema changes.

Installation

The connector is available from the Maven Central.

  1. Navigate to the releases page and select the version matching your Kafka Connect installation.

  2. Download the JAR file.

For installation steps, see Installing Connector Plugins.

Configuration

For the complete configuration reference, see the official source connector documentation.

To configure a connector in Axual Self-Service, see Starting Connectors. TIP: To speed up your deployment, use the Terraform Boilerplate or the Management API Boilerplate.

Getting Started

This section walks through setting up the JDBC Source Connector using a MySQL instance on Google Cloud.

Prerequisites

MySQL instance

You need a running MySQL instance reachable from the Kafka Connect cluster.

We’ll deploy a MySQL instance using Google Cloud Services.
If you have a Google account, you can sign up for a free trial of Google cloud.

You can read more about MySQL on Google Cloud here.

  1. Create a new MySQL instance.

    • instance ID: my-axual-mysql-connector-test
      The instance ID is irrelevant for our example: It will only be displayed in your Google overview.

    • Password: You can set the password for the root user of the database. We’ll use this password when configuring the Connector-Application later.
      You can reset this password later.

    • Database: Select "{database-type}" (likely the default option).

    • Choose a configuration to start with: Development

    • Region: europe-west1
      The region isn’t very irrelevant for our example, but usually you would select a region geographically closest to the Connect-Cluster.

    • Zonal availability: Single zone

    • Customize your instance: Click "Show configuration options"

      • Machine type: Click the dropdown menu and select Lightweight. 1vCPU is enough.

      • Storage: go for the least amount of storage.

        • If available, Use HDDs.

        • If given the option, do not enable automatic storage increases.

      • Connections: Leave only "Public IP" selected.

        • Authorized networks. Click "Add network". Use any name and 0.0.0.0/0 as the CIDR notation (and click "Done").
          This will open the database to the entire internet. That’s ok, we’ll delete it shortly anyway.

      • Authentication: Skip this config if it’s present

      • Data protection:

        • Enable backups. Under "Advanced options", select (single) Region (europe-west1)

        • If available, enable point-in-time recovery

        • Disable deletion protection

      • Maintenance: Skip this config

      • Flags and parameters: Skip this config

      • Labels: Skip this config

    • Click "Create instance".

  2. While the database server is getting deployed, let’s create a bucket.

    • Name your bucket: mysql-init-axual-connect (or something different, in case the name is already taken).
      Skip labels.
      Click "Continue".
      The bucket name is irrelevant for our example: It will only be displayed in your Google overview.

    • Choose where to store your data:

      • Region: europe-west1 (you don’t need Multi or dual region. Single region is enough)
        The region isn’t very relevant for our example, but usually you would select a region geographically closest to the Connect-Cluster.

    • Choose a default storage class for your data: Standard.
      Click "Continue".

    • Choose how to control access to objects: Uniform.
      Click "Continue".

    • Choose how to protect object data: None

      • Data encryption: Skip this config

  3. Click "Create" to create the bucket.
    If you get a pop-up with "Public access will be prevented", click "Confirm".

  4. The bucket page will open. Click "Upload files"

  5. Save the following text to a file on your system, and then click "Upload files" to upload it into the bucket:

    Create the test table and seed data:

    USE mysql_database_name;
    
    CREATE TABLE HOTEL
    (
      hotel_id    INT           NOT NULL AUTO_INCREMENT,
      hotel_name  VARCHAR(100)  NULL,
      PRIMARY KEY (hotel_id)
    );
    
    INSERT INTO `HOTEL` VALUES
    (1, 'iris'),
    (2, 'altair'),
    (3, 'turbulence');
  6. You can close the buckets page. Let’s go back to our SQL instance.
    Select your instance to open it. Note down the public IP address.
    We’ll use it when configuring the Connector-Application later.

  7. [Optional] Change the root user’s password:

    • On the left-side menu, click "Users".

    • Click the dots next to the root user.

    • Click "Change password".

    • Type a new password and click "OK".

  8. On the left-side menu, click "Databases".

    • Click "Create Database".
      Use mysql_database_name as the name, since this is the name used in this example instance-setup (we referenced it in the SQL snippet above, and we’ll do it again in the connector configuration).

      • Click "Create".

  9. On the left-side menu, click "Overview". Click the "Import" button.

    • Source: click "Browse".

      • Double-click your bucket and select the SQL file we saved earlier.

      • Click "SELECT" at the bottom of the page.

    • File format: SQL

    • Destination: mysql_database_name

    • Click "Import"

Axual stream

The stream where the connector will produce events must already exist in Axual Self-Service. See Creating streams if you need to create it.

Steps

Step 1 — Create a connector application

  1. Follow the Creating streams documentation in order to create one stream and deploy it onto an environment.
    The name of the stream will be mysql_HOTEL.
    The key/value types will be JSON/JSON.

  2. Follow the Configure and install a connector documentation to set up a new Connector-Application.
    Let’s call it my.mysql.source.
    The plugin name is io.aiven.connect.jdbc.JdbcSourceConnector.
    If a plugin isn’t available, ask a platform operator to install plugins.

Step 2 — Configure the connector

  1. Provide the following minimal configuration:

    connector.class

    io.aiven.connect.jdbc.JdbcSourceConnector

    connection.url

    jdbc:mysql://PASTE_THE_IP_ADDRESS:3306/mysql_database_name
    You noted down the instance IP address after creating the MySQL instance.

    connection.user

    root

    connection.password

    Your database password

    mode

    incrementing

    incrementing.column.name

    hotel_id
    The primary key column.

    table.whitelist

    HOTEL
    Case-sensitive.

    topic.prefix

    mysql_

    value.converter

    org.apache.kafka.connect.json.JsonConverter

    value.converter.schemas.enable

    true
    Include the schema envelope so the sink connector can parse each event.

    For advanced options, see the official source connector documentation.

Step 3 — Start the connector

Start the connector application from Axual Self-Service. Once running, new rows inserted into the HOTEL table will be published to the mysql_HOTEL stream.

Step 4 — Verify

In Axual Self-Service, use stream-browse on mysql_HOTEL to confirm events are arriving. An example event:

{
  "key": null,
  "value": {
    "schema": {
      "name": "HOTEL",
      "type": "struct",
      "optional": false,
      "fields": [
        { "field": "hotel_id",   "type": "int32",  "optional": false },
        { "field": "hotel_name", "type": "string", "optional": true }
      ]
    },
    "payload": {
      "hotel_id": 1,
      "hotel_name": "iris"
    }
  }
}

The schema field is present because value.converter.schemas.enable=true. The JDBC Sink Connector uses this schema to construct INSERT statements.

Cleanup

When you are done:

  1. Stop the connector application in Axual Self-Service.

  2. Remove stream access for the application if no longer needed.

  3. Delete your Cloud Storage bucket and your SQL instance if they were created only for testing.

Known limitations

  • This connector uses polling — it does not provide real-time change detection. Use Debezium MySQL for CDC.

  • table.whitelist values are case-sensitive — the table name must match exactly as it appears in MySQL.

  • Schema evolution is not supported — changes to the source table schema require manual reconfiguration.

Examples

Minimal configuration

{
  "name": "my-jdbc-mysql-source",
  "config": {
    "connector.class": "io.aiven.connect.jdbc.JdbcSourceConnector",
    "connection.url": "jdbc:mysql://123.123.123.123:3306/mysql_database_name",
    "connection.user": "root",
    "connection.password": "<your-database-password>",
    "mode": "incrementing",
    "incrementing.column.name": "hotel_id",
    "table.whitelist": "HOTEL",
    "topic.prefix": "mysql_",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "true"
  }
}

License

MySQL JDBC Connector is licensed under the Apache License, Version 2.0.