Snowflake sink Connector, version 1.4.4

Snowflake

If you already have a publicly accessible deployment of Snowflake, you can skip ahead and Configure and install a connector, using the Axual Self-Service UI. Use the configuration instructions below.
If you don’t have one available, the following section instructs you to deploy a publicly available instance of Snowflake.

Deploying a Snowflake instance

  1. You will authenticate against Snowflake using public-key encryption.
    You will generate a key-pair in your terminal. There are multiple ways to get an RSA key, but only keys generated this way will work with Snowflake.
    For additional information, check Snowflake Key-Pair authentication.

    openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out MY_SNOWFLAKE.key -nocrypt
    openssl rsa -in MY_SNOWFLAKE.key -pubout -out MY_SNOWFLAKE.cert
  2. For each file, you’ll need to remove the first and last lines, and concatenate all of their remaining contents onto a single line.
    The following commands will do just that.

    cat MY_SNOWFLAKE.key  | sed '1d' | sed '$d' | tr -d '\n' > MY_SNOWFLAKE_COMPACT.key
    cat MY_SNOWFLAKE.cert | sed '1d' | sed '$d' | tr -d '\n' > MY_SNOWFLAKE_COMPACT.cert

    The contents of the MY_SNOWFLAKE_COMPACT.key file will be used later, when configuring the Connector-Application, for the snowflake.private.key configuration value.
    The contents of the MY_SNOWFLAKE_COMPACT.cert file will be used shortly, when configuring your Snowflake credentials.

  3. Sign up with Snowflake to create a new trial account.
    Select a standard account type and any cloud provider when prompted.
    After activating your account by confirming your e-mail address, you should receive a second e-mail which contains your Account URL.

  4. Login to your Snowflake account and click the Worksheets button on the left-side menu.
    Click the + Worksheet button at the top-right of the screen to create a new worksheet.

  5. Paste the following statements in the worksheet. Do not execute anything yet.

    -- For the purposes of this demo, we'll use the admin role
    USE ROLE ACCOUNTADMIN;
    
    -- Create a warehouse which will execute insert statements prompted by kafka connect
    CREATE WAREHOUSE SF_WAREHOUSE WITH
       WAREHOUSE_SIZE = 'XSMALL'
       WAREHOUSE_TYPE = 'STANDARD'
       AUTO_SUSPEND = 300
       AUTO_RESUME = TRUE
    ;
    
    -- Create snowflake resources
    CREATE DATABASE SF_DATABASE;
    CREATE SCHEMA SF_DATABASE.SF_SCHEMA;
    CREATE TABLE SF_TABLE (
       RECORD_METADATA VARIANT,
       RECORD_CONTENT  VARIANT
    );
    
    -- Create a role with permission to use the above resources
    -- Required permissions for kafka connectors are listed here:
    -- https://docs.snowflake.com/en/user-guide/kafka-connector-install.html#required-privileges
    CREATE ROLE SF_ROLE;
    
    GRANT USAGE ON WAREHOUSE SF_WAREHOUSE
       TO ROLE SF_ROLE;
    GRANT USAGE ON DATABASE SF_DATABASE
       TO ROLE SF_ROLE;
    GRANT USAGE ON SCHEMA SF_DATABASE.SF_SCHEMA
       TO ROLE SF_ROLE;
    GRANT CREATE STAGE ON SCHEMA SF_DATABASE.SF_SCHEMA
       TO ROLE SF_ROLE;
    GRANT CREATE PIPE ON SCHEMA SF_DATABASE.SF_SCHEMA
       TO ROLE SF_ROLE;
    GRANT OWNERSHIP ON TABLE SF_DATABASE.SF_SCHEMA.SF_TABLE
       TO ROLE SF_ROLE;
  6. Append the following statements to the same worksheet, after replacing the username and public key.
    Replace YOUR_USERNAME with your Snowflake account username.
    Paste the contents of the MY_SNOWFLAKE_COMPACT.cert as the public key.

    GRANT ROLE SF_ROLE
       TO USER YOUR_USERNAME;
    ALTER USER YOUR_USERNAME SET DEFAULT_ROLE = SF_ROLE;
    ALTER USER YOUR_USERNAME SET RSA_PUBLIC_KEY='MIIBIj...IDAQAB';
  7. Select the entirety of the worksheet contents via Right Click → Select All or Ctrl+A (Cmd+A on MacOS).
    We do this because, when executing worksheet instructions, only the selected lines are actually executed.
    Now, with all the lines selected, click the Play button on the top-right of the screen to execute all the commands in the worksheet.

  8. On the left side of the screen, click the + icon to create another worksheet.
    Paste the following contents and execute the worksheet, in order to acquire your "Account locator URL":

    SELECT t.value:host::varchar AS ACCOUNT_LOCATOR
    FROM TABLE (flatten(input => parse_json(system$whitelist()))) AS t
    WHERE t.value:type = 'SNOWFLAKE_DEPLOYMENT';

    We’ll use this URL as the snowflake.url.name property when configuring the Connector-Application later.

Configuring a new sink Connector

  1. Follow the Creating streams documentation in order to create one stream and deploy it onto an environment.
    The name of the stream will be my_snowflake_stream.
    The key/value types will be String/String.

  2. Follow the Configure and install a connector documentation to set up a new Connector-Application.
    Let’s call it my_snowflake_sink.
    The plugin name is com.snowflake.kafka.connector.SnowflakeSinkConnector.
    If a plugin isn’t available, ask a platform operator to install plugins.

  3. Produce some String/String events to the my_snowflake_stream stream. You can follow the producer documentation and examples.

  4. Provide the following minimal configuration in order to connect to the previously configured Snowflake instance.
    For advanced configuration, see the official sink connector documentation.

    topics

    my_snowflake_stream

    key.converter

    org.apache.kafka.connect.storage.StringConverter

    value.converter

    org.apache.kafka.connect.storage.StringConverter

    snowflake.database.name

    SF_DATABASE

    snowflake.schema.name

    SF_SCHEMA

    snowflake.role.name

    SF_ROLE

    snowflake.topic2table.map

    my_snowflake_stream:SF_TABLE

    snowflake.user.name
    Your user account name, the one you log into Snowflake with. Not your e-mail.

    Example value:
    JOHN

    snowflake.url.name
    This is the URL you acquired from snowflake via the second worksheet.

    Example value:
    ab12345.europe-west4.gcp.snowflakecomputing.com

    snowflake.private.key
    This is the private key you generated at the beginning, in the MY_SNOWFLAKE_COMPACT.key file.

    Example value:
    MIIEvgIBAD…​WQ9zu9

    snowflake.private.key.passphrase
    Since we created a plaintext key, no password is necessary

    Make sure this field is empty (has no value).
    I.e. If it contains the word [hidden] by default, you have to delete it.

  5. Authorize the my_snowflake_sink sink Connector-Application to consume the my_snowflake_stream stream.

  6. You can now start the sink Connector-Application.

  7. Let’s find the events in Snowflake.

    • Go to the Snowflake Homepage.

    • On the left-side menu, select Data → Databases

    • A new menu appears on the right. Expand SF_DATABASE → SF_SCHEMA → Tables → SF_TABLE

    • The table menu opens. Click the Data Preview tab.

    • Click the Select Warehouse button right below, and select SF_WAREHOUSE.

      Snowflake UI, in the "Data Preview" tab of a selected table

      If nothing is displayed, click the Refresh icon in the top-right corner of the page. You might have to wait up to 2 minutes, as the Snowflake connector sends messages in batches.

Cleanup

Once you are done, stop the Connector-Application and cleanup the unused axual resources.
In order to clean up your Snowflake resources, you can run the following in a worksheet (don’t forget to edit YOUR_USERNAME):

DROP DATABASE SF_DATABASE;
ALTER USER YOUR_USERNAME SET DEFAULT_ROLE = ACCOUNTADMIN;
DROP ROLE SF_ROLE;
DROP WAREHOUSE SF_WAREHOUSE;

You can now also delete all your worksheets.

License

Snowflake sink-Connector is licensed under the Apache License, Version 2.0.

Source code

The source code for the Connect-Plugin can be found at github.com/snowflakedb/snowflake-kafka-connector.