JDBC Sink Connector — SQL Server

Type

Sink

Class

io.aiven.connect.jdbc.JdbcSinkConnector

Target System

Database (SQL Server)

Maintainer

Aiven

License

Apache License 2.0

Project

github.com/aiven/jdbc-connector-for-apache-kafka

Download

Maven Central

This page documents version 6.12.0. Newer versions should be compatible unless there are breaking changes, but field names or default values may differ. If you notice discrepancies, please contact Axual Support.

Description

The JDBC Sink Connector for SQL Server consumes records from Kafka topics and writes them into SQL Server tables.

It is maintained by Aiven as part of the open-source github.com/aiven/jdbc-connector-for-apache-kafka.

Features

  • Write Kafka records into SQL Server tables

  • Configurable write modes: insert and upsert

  • Primary key extraction from record key, value, or Kafka coordinates

  • Flexible table name mapping via configurable pattern

  • Schema-aware: requires JSON (with schema) or Avro converters

When to Use

  • You need to write Kafka records into a SQL Server table as part of a data pipeline.

  • You want to materialise a Kafka topic as a database table for downstream querying.

  • You need upsert semantics to keep a SQL Server table in sync with a Kafka topic.

When NOT to Use

  • Your Kafka records do not carry a schema — the connector requires the schema envelope to construct INSERT statements. Use a schema-aware converter upstream.

  • Your database schema changes frequently — schema evolution is not supported and requires manual reconfiguration.

Installation

The connector is available from the Maven Central.

  1. Navigate to the releases page and select the version matching your Kafka Connect installation.

  2. Download the JAR file.

For installation steps, see Installing Connector Plugins.

Configuration

For the complete configuration reference, see the official sink connector documentation.

To configure a connector in Axual Self-Service, see Starting Connectors. TIP: To speed up your deployment, use the Terraform Boilerplate or the Management API Boilerplate.

Getting Started

This section walks through setting up the JDBC Sink Connector to write records from a Kafka stream into a SQL Server table.

The example assumes a stream named sqlserver_demo.categories already contains records with a schema envelope (e.g. produced by the JDBC Source Connector).

Prerequisites

SQL Server instance

You need a running SQL Server instance reachable from the Kafka Connect cluster, with a database and user that has write permissions.

If you do not have a SQL Server instance yet, see the JDBC Source Connector prerequisites for Google Cloud setup instructions.

Axual stream with schema-enabled records

The Kafka stream this connector consumes must contain records with a schema envelope. If you are using JsonConverter, the producer must set value.converter.schemas.enable=true.

If the stream was produced by the JDBC Source Connector with value.converter.schemas.enable=true, it is already in the correct format.

Steps

Step 1 — Create a connector application

  1. Follow the Configure and install a connector documentation to set up a new Connector-Application.
    Let’s call it my.sqlserver.sink.
    The plugin name is io.aiven.connect.jdbc.JdbcSinkConnector.
    If a plugin isn’t available, ask a platform operator to install plugins.

Step 2 — Configure the connector

  1. Provide the following minimal configuration:

    connector.class

    io.aiven.connect.jdbc.JdbcSinkConnector

    connection.url

    jdbc:sqlserver://PASTE_THE_IP_ADDRESS:1433;databaseName=database_name;

    connection.user

    sqlserver

    connection.password

    Your database password

    insert.mode

    upsert

    pk.fields

    category_id

    pk.mode

    record_value
    Extract the primary key from the Kafka record value.

    table.name.format

    kafka_${topic}
    Writes into the table kafkasqlserver_demo.categories._

    topics

    sqlserver_demo.categories
    Case-sensitive.

    value.converter

    org.apache.kafka.connect.json.JsonConverter

    value.converter.schemas.enable

    true

    For advanced options, see the official sink connector documentation.

Step 3 — Start the connector

Start the connector application from Axual Self-Service. Once running, records from sqlserver_demo.categories will be written to the kafka_sqlserver_demo.categories table in SQL Server.

Step 4 — Verify

Verify the data using the Google Cloud Shell:

gcloud sql connect my-axual-sqlserver-connector-test --user=sqlserver

Then query the table:

USE database_name;
SELECT * FROM kafka_sqlserver_demo.categories;

Cleanup

When you are done:

  1. Stop the connector application in Axual Self-Service.

  2. Remove stream access for the application if no longer needed.

  3. Delete your SQL instance if it was created only for testing.

Known limitations

  • The connector requires the Kafka record value to include the table schema — value.converter.schemas.enable must be true.

  • Schema evolution is not supported — changes to the record schema require manual reconfiguration.

  • table.name.format values are case-sensitive on SQL Server when using quoted identifiers.

Examples

Minimal configuration

{
  "name": "my-jdbc-sqlserver-sink",
  "config": {
    "connector.class": "io.aiven.connect.jdbc.JdbcSinkConnector",
    "connection.url": "jdbc:sqlserver://123.123.123.123:1433;databaseName=database_name;",
    "connection.user": "sqlserver",
    "connection.password": "<your-database-password>",
    "insert.mode": "upsert",
    "pk.fields": "category_id",
    "pk.mode": "record_value",
    "table.name.format": "kafka_${topic}",
    "topics": "sqlserver_demo.categories",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "true"
  }
}

License

SQL Server JDBC Connector is licensed under the Apache License, Version 2.0.