JDBC source and sink Connector for SQL Server, version 6.7.0

JDBC Connector for SQL Server

If you already have a publicly accessible deployment of SQL Server, you can skip ahead and Configure and install a connector, using the Axual Self-Service UI. Use the configuration instructions below.
If you don’t have one available, the following section instructs you to deploy a publicly available instance of SQL Server.

Deploying a SQL Server instance

We’ll deploy a SQL Server instance using Google Cloud Services.
If you have a Google account, you can sign up for a free trial of Google cloud.

You can read more about SQL Server on Google cloud here.

Do not extend this guide over multiple days, as SQLServer deployed via Google cloud might incur a licensing fee that greatly exceeds the costs of the server itself. Go through this use-case in a day, and finish it off by decommissioning all your resources at the end.

Let’s get started.

  1. Create a new SQL Server instance.

    • instance ID: my-axual-sqlserver-connector-test
      The instance ID is irrelevant for our example: It will only be displayed in your Google overview.

    • Password: You can set the password for the sqlserver user of the database. We’ll use this password when configuring the Connector-Application later.
      You can reset this password later.

    • Database: Select "SQL Server 2019 Standard" (likely the default option).

    • Choose a configuration to start with: Development

    • Region: europe-west1
      The region isn’t very irrelevant for our example, but usually you would select a region geographically closest to the Connect-Cluster.

    • Zonal availability: Single zone

    • Customize your instance: Click "Show configuration options"

      • Machine type: Click the dropdown menu and select Lightweight. 1vCPU is enough.

      • Storage: go for the least amount of storage.

        • If available, Use HDDs.

        • If given the option, do not enable automatic storage increases.

      • Connections: Leave only "Public IP" selected.

        • Authorized networks. Click "Add network". Use any name and 0.0.0.0/0 as the CIDR notation (and click "Done").
          This will open the database to the entire internet. That’s ok, we’ll delete it shortly anyway.

      • Authentication: Skip this config if it’s present

      • Data protection:

        • Enable backups. Under "Advanced options", select (single) Region (europe-west1)

        • If available, enable point-in-time recovery

        • Disable deletion protection

      • Maintenance: Skip this config

      • Flags and parameters: Skip this config

      • Labels: Skip this config

    • Click "Create instance".

  2. While the database server is getting deployed, let’s create a bucket.

    • Name your bucket: sqlserver-init-axual-connect (or something different, in case the name is already taken).
      Skip labels.
      Click "Continue".
      The bucket name is irrelevant for our example: It will only be displayed in your Google overview.

    • Choose where to store your data:

      • Region: europe-west1 (you don’t need Multi or dual region. Single region is enough)
        The region isn’t very relevant for our example, but usually you would select a region geographically closest to the Connect-Cluster.

    • Choose a default storage class for your data: Standard.
      Click "Continue".

    • Choose how to control access to objects: Uniform.
      Click "Continue".

    • Choose how to protect object data: None

      • Data encryption: Skip this config

  3. Click "Create" to create the bucket.
    If you get a pop-up with "Public access will be prevented", click "Confirm".

  4. The bucket page will open. Click "Upload files"

  5. Save the following text to a file on your system, and then click "Upload files" to upload it into the bucket:

    CREATE SCHEMA demo
    GO
    
    CREATE TABLE demo.categories (
    	category_id   INT IDENTITY  PRIMARY KEY,
    	category_name VARCHAR (255) NOT NULL
    );
    
    CREATE TABLE demo.kafka_sqlserver_demo.categories (
    	category_id   INT IDENTITY  PRIMARY KEY,
    	category_name VARCHAR (255) NOT NULL
    );
    
    SET IDENTITY_INSERT demo.categories ON;
    INSERT INTO demo.categories (category_id, category_name)
    VALUES (1,'Names'), (2,'Places'), (3,'Hobbies');
  6. You can close the buckets page. Let’s go back to our SQL instance.
    Select your instance to open it. Note down the public IP address.
    We’ll use it when configuring the Connector-Application later.

  7. [Optional] Change the sqlserver user’s password:

    • On the left-side menu, click "Users".

    • Click the dots next to the sqlserver user.

    • Click "Change password".

    • Type a new password and click "OK".

  8. On the left-side menu, click "Databases".

    • Click "Create Database".
      Use database_name as the name, since this is the name used in this example instance-setup (we referenced it in the SQL snippet above, and we’ll do it again in the connector configuration).

      • Click "Create".

  9. On the left-side menu, click "Overview". Click the "Import" button.

    • Source: click "Browse".

      • Double-click your bucket and select the SQL file we saved earlier.

      • Click "SELECT" at the bottom of the page.

    • File format: SQL

    • Destination: database_name

    • Click "Import"

Configuring a new source Connector

  1. Follow the Creating streams documentation in order to create one stream and deploy it onto an environment.
    The name of the stream will be sqlserver_demo.categories.
    The key/value types will be JSON/JSON.

  2. Follow the Configure and install a connector documentation to set up a new Connector-Application.
    Let’s call it my.sqlserver.source.
    The plugin name is io.aiven.connect.jdbc.JdbcSourceConnector.
    If a plugin isn’t available, ask a platform operator to install plugins.

    For advanced configuration, see the official source connector documentation.

  3. Provide the following minimal configuration in order to connect to the previously configured SQL Server instance.

    connection.url

    jdbc:sqlserver://PASTE_THE_IP_ADDRESS:3306;databaseName=database_name;
    You noted down the instance IP address after creating the SQL Server instance.

    connection.user

    sqlserver

    connection.password
    This is the database password. You noted it down while creating the SQL Server instance.

    Example value:
    1234abcdEFG

    mode

    incrementing

    incrementing.column.name
    The unique primary key of the database table

    category_id

    table.whitelist

    categories
    This value is CASE SENSITIVE

    topic.prefix

    sqlserver_

    value.converter

    org.apache.kafka.connect.json.JsonConverter

    value.converter.schemas.enable
    This property-key is not prefilled in the configuration by default, so you’ll have to add it as a custom property.
    Normally, this property is not necessary.
    We enable it this time in order to include the JSON schema in every event (which yes, is wasteful, however): this will enable us to easily produce these events back to SQL Server, when configuring the sink connector.

    true

  4. You can now start the source Connector-Application.

  5. You can now use the stream-browse functionality in the Axual Self-Service UI to check the events published by the source Connector to the sqlserver_demo.categories stream.
    This is an example event:

    {
      "key" : null,
      "value": {
        "schema": {
          "name": "categories",
          "type": "struct",
          "optional": false,
          "fields": [
            {
              "field": "category_id",
              "type": "int32",
              "optional": false
            },
            {
              "field": "category_name",
              "type": "string",
              "optional": true
            }
          ]
        },
        "payload": {
          "category_id": 1,
          "category_name": "Names"
        }
      }
    }

    Notice how the "value" contains 2 fields:

    • "Payload" is the actual event. In this case, the first entry in the db table.

    • "Schema" is the database schema, which will be used when converting the json payload to the insert SQL statement later, when configuring the sink connector.

Configuring a new sink Connector

To keep things simple, we’ll consume the topic we just used for the source connector and publish everything into another SQL Server table.
If you did not follow the previous example, then you will need to produce an event to the sqlserver_demo.categories topic which includes the db-table schema, just as in the example above.

  1. Follow the Configure and install a connector documentation to set up a new Connector-Application.
    Let’s call it my.sqlserver.sink.
    The plugin name is io.aiven.connect.jdbc.JdbcSinkConnector.
    If a plugin isn’t available, ask a platform operator to install plugins.

    For advanced configuration, see the official sink connector documentation.

  2. Provide the following minimal configuration in order to connect to the previously configured SQL Server instance.

    connection.url

    jdbc:sqlserver://PASTE_THE_IP_ADDRESS:3306;databaseName=database_name;
    You noted down the instance IP address after creating the SQL Server instance.

    connection.user

    sqlserver

    connection.password This is the database password. You noted it down while creating the SQL Server instance.

    Example value:
    1234abcdEFG

    insert.mode

    upsert

    pk.fields

    category_id

    pk.mode

    record_value
    Instruct the connector to extract the primary key from the kafka-event value.

    table.name.format

    kafka_${topic}
    Instruct the connector to put events into the kafkasqlserver_demo.categories SQL Server table._

    topics

    sqlserver_demo.categories
    This field is case-sensitive.

    value.converter

    org.apache.kafka.connect.json.JsonConverter

    value.converter.schemas.enable

    true

  3. You can now start the sink Connector-Application.

  4. We’ll now list the contents of the kafka_sqlserver_demo.categories SQL Server table using Google Cloud shell.
    At the top-right of your screen, next to your picture, are a few buttons.
    One of them is "Activate Cloud Shell". Click it.
    A shell will open at the bottom of your screen.

    1. Copy-Paste the following code-block to connect to your SQL instance:

      gcloud sql connect my-axual-sqlserver-connector-test --user=sqlserver
    2. Click "Authorize" in case you get a pop-up.
      You might also have to type 'y' in the console to enable the sql API for your console.

    3. Type your password when prompted.

    4. Now paste the following command to list everything in the table:

      USE database_name;
      SELECT * FROM kafka_sqlserver_demo.categories;
    5. You can now type exit to close the SQL shell or just close the terminal window.

Cleanup

Once you are done, stop the Connector-Application and cleanup the unused axual resources.
In case you deployed your resources via Google Cloud, don’t forget to delete your bucket and your SQL instance once you are done.

License

SQL Server source and sink-Connector is licensed under the Apache License, Version 2.0.

Source code

The source code for the Connect-Plugin can be found at github.com/aiven/jdbc-connector-for-apache-kafka.