JDBC Source Connector — Snowflake

Type

Source

Class

io.aiven.connect.jdbc.JdbcSourceConnector

Target System

Snowflake

Maintainer

Aiven

License

Apache License 2.0

Project

github.com/aiven/jdbc-connector-for-apache-kafka

Download

Maven Central

This page documents version 6.12.0. Newer versions should be compatible unless there are breaking changes, but field names or default values may differ. If you notice discrepancies, please contact Axual Support.

Description

The JDBC Source Connector for Snowflake reads rows from a Snowflake table and publishes them as records to Kafka topics using the Aiven-Open JDBC Source Connector (io.aiven.connect.jdbc.JdbcSourceConnector).

There is no officially supported Snowflake Source Connector. This page describes a proven approach using the generic JDBC Source Connector with additional Snowflake-specific JDBC driver dependencies.

See also: Snowflake Sink Connector for writing Kafka records into Snowflake.

Features

  • Poll Snowflake tables and publish rows to Kafka topics

  • timestamp+incrementing mode for reliable offset tracking across high-throughput workloads

  • JSON result format to avoid Apache Arrow memory allocation issues

  • UTC timezone handling to prevent offset comparison failures with TIMESTAMP_NTZ columns

  • Configurable polling interval and batch size

When to Use

  • You need to stream rows from a Snowflake table into Kafka topics.

  • Near-real-time polling latency is acceptable for your use case.

  • Your source table has an auto-incrementing id column and a maintained updated_at timestamp column.

When NOT to Use

  • You need real-time change data capture with sub-second latency — Snowflake does not support CDC via JDBC.

  • Your source table lacks an auto-incrementing primary key or a maintained updated_at column — the timestamp+incrementing mode requires both.

  • You need to write data into Snowflake — use the Snowflake Sink Connector instead.

Installation

The JDBC Source Connector plugin is available from the Maven Central.

  1. Navigate to the releases page and select the version matching your Kafka Connect installation.

  2. Download the JAR file.

For installation steps, see Installing Connector Plugins.

Required additional drivers

The standard JDBC connector does not include the Snowflake JDBC driver or its cryptographic dependencies. Without them, the connector fails with java.sql.SQLException: No suitable driver found for jdbc:snowflake://…​.

The following JAR files must be placed manually inside the JDBC connector’s plugin directory (e.g. ./connect/plugins/aiven-jdbc-connector/):

JAR file Purpose Notes

snowflake-jdbc-fips-3.27.1.jar

Snowflake JDBC driver (FIPS variant)

The FIPS driver bundles its own internal dependencies. The standard non-FIPS driver may also be used.

bcprov-jdk15on-1.70.jar

Bouncy Castle security provider

Required for cryptographic services. The jdk15on variant is mandatory for JDK 8/11 compatibility.

bcpkix-jdk15on-1.70.jar

Bouncy Castle PKIX/operator

Required to prevent java.lang.NoClassDefFoundError during Snowflake driver initialization.

Configuration

For the complete configuration reference, see the {hyperlink-official-source-connector-documentation}.

To configure a connector in Axual Self-Service, see Starting Connectors. TIP: To speed up your deployment, use the Terraform Boilerplate or the Management API Boilerplate.

Key configuration parameters

Several parameters are critical for stable operation with Snowflake and are explained below.

Connection URL

The connection URL must include the following query parameters:

JDBC_QUERY_RESULT_FORMAT=JSON

Forces the Snowflake JDBC driver to return results as JSON instead of Apache Arrow binary format. Arrow requires native memory allocation that often causes out-of-memory or RootAllocator errors in Kafka Connect. JSON bypasses this entirely.

TIMEZONE=UTC

Tells the Snowflake driver to interpret all non-timezone-aware timestamps as UTC. This prevents offset comparison failures caused by timezone ambiguity in TIMESTAMP_NTZ columns.

Polling mode

mode=timestamp+incrementing uses two columns to track offset progress:

  • timestamp.column.name — tracks temporal progress; records older than timestamp.delay.interval.ms are eligible for polling.

  • incrementing.column.name — acts as a tie-breaker when multiple rows share the same timestamp, ensuring no records are skipped.

Time precision

time.precision.mode=adaptive_time_storage is recommended for Snowflake. It instructs the connector to pass the database’s native timestamp string directly into the SQL WHERE clause instead of converting it to a Java millisecond epoch. This avoids precision loss: Snowflake supports nanosecond timestamps while Java milliseconds have only 3-decimal precision. Any sub-millisecond truncation can cause the connector to miss or repeat records at offset boundaries.

Source table requirements

For reliable operation, the source Snowflake table must follow these design principles:

Column Requirement Rationale

id

Unique, auto-incrementing numeric column

Used as the tie-breaker in timestamp+incrementing mode. Must always increase on insertion.

updated_at

TIMESTAMP type, updated automatically on every row modification

Tracks temporal progress. TIMESTAMP_LTZ is preferred; if using TIMESTAMP_NTZ, ensure TIMEZONE=UTC is set in the connection URL.

Getting Started

Prerequisites

Snowflake instance

You need an active Snowflake subscription with:

  • A dedicated service user and a role (SF_ROLE) with SELECT access on the source table.

  • A source table containing an auto-incrementing id column and an updated_at timestamp column.

Additional driver JARs

Install the three required driver JARs into the JDBC connector plugin directory before starting the connector. See [_required_additional_drivers] above.

Axual stream

The stream where the connector will produce events must already exist in Axual Self-Service. See Creating streams if you need to create it.

Steps

Step 1 — Create a connector application

  1. Follow the Configure and install a connector documentation to set up a new Connector-Application.
    Let’s call it {application-name}.
    The plugin name is io.aiven.connect.jdbc.JdbcSourceConnector.
    If a plugin isn’t available, ask a platform operator to install plugins.

Step 2 — Configure the connector

  1. Provide the following minimal configuration:

    connector.class

    io.aiven.connect.jdbc.JdbcSourceConnector

    connection.url

    jdbc:snowflake://<account>.snowflakecomputing.com/?warehouse=<warehouse>&db=<database>&schema=<schema>&role=<role>&JDBC_QUERY_RESULT_FORMAT=JSON&TIMEZONE=UTC

    connection.user

    <snowflake-username>

    connection.password

    <snowflake-password>

    mode

    timestamp+incrementing

    incrementing.column.name

    id

    timestamp.column.name

    updated_at

    time.precision.mode

    adaptive_time_storage

    query

    select * from "<table-name>"

    topic.prefix

    <topic_prefix>

    For advanced options, see the {hyperlink-official-source-connector-documentation}.

Step 3 — Start the connector

Start the connector application from Axual Self-Service.

Step 4 — Verify

In Axual Self-Service, use stream-browse on your target stream to confirm events are arriving.

Cleanup

When you are done:

  1. Stop the connector application in Axual Self-Service.

  2. Remove stream access for the application if no longer needed.

Known limitations

  • There is no official Snowflake Source Connector. This setup relies on the generic JDBC Source Connector with manually installed additional drivers.

  • The source table must have a unique auto-incrementing id column and a maintained updated_at timestamp column. Tables without these columns are not supported.

  • This connector uses polling — it does not provide real-time change detection. Polling latency is controlled by table.poll.interval.ms.

  • Schema evolution is not supported. Changes to the source table schema require manual reconfiguration.

  • The Apache Arrow result format is not supported in Kafka Connect environments. JDBC_QUERY_RESULT_FORMAT=JSON must always be set.

Examples

Minimal configuration

{
  "name": "snowflake-jdbc-source",
  "config": {
    "connector.class": "io.aiven.connect.jdbc.JdbcSourceConnector",
    "tasks.max": "1",
    "connection.url": "jdbc:snowflake://myorg-sf7890.snowflakecomputing.com/?warehouse=COMPUTE_WH&db=ANALYTICS_DB&schema=PUBLIC&role=KAFKA_ROLE&JDBC_QUERY_RESULT_FORMAT=JSON&TIMEZONE=UTC",
    "connection.user": "svc_kafka_connect",
    "connection.password": "Snowflake@Connect1",
    "mode": "timestamp+incrementing",
    "incrementing.column.name": "id",
    "timestamp.column.name": "updated_at",
    "time.precision.mode": "adaptive_time_storage",
    "query": "select * from \"EVENTS\"",
    "topic.prefix": "snowflake_"
  }
}

Full configuration

{
  "name": "snowflake-jdbc-source",
  "config": {
    "connector.class": "io.aiven.connect.jdbc.JdbcSourceConnector",
    "tasks.max": "1",
    "connection.url": "jdbc:snowflake://myorg-sf7890.snowflakecomputing.com/?warehouse=COMPUTE_WH&db=ANALYTICS_DB&schema=PUBLIC&role=KAFKA_ROLE&JDBC_QUERY_RESULT_FORMAT=JSON&TIMEZONE=UTC",
    "connection.user": "svc_kafka_connect",
    "connection.password": "Snowflake@Connect1",
    "connection.attempts": "100",
    "connection.backoff.ms": "300000",
    "mode": "timestamp+incrementing",
    "incrementing.column.name": "id",
    "timestamp.column.name": "updated_at",
    "timestamp.delay.interval.ms": "5000",
    "time.precision.mode": "adaptive_time_storage",
    "query": "select * from \"EVENTS\"",
    "table.types": "table",
    "table.poll.interval.ms": "60000",
    "batch.max.rows": "100",
    "numeric.mapping": "best_fit",
    "numeric.precision.mapping": "false",
    "quote.sql.identifiers": "ALWAYS",
    "db.timezone": "UTC",
    "topic.prefix": "snowflake_",
    "transforms": "createKey",
    "transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
    "transforms.createKey.fields": "id",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.storage.StringConverter",
    "header.converter": "org.apache.kafka.connect.storage.StringConverter"
  }
}

License

Snowflake JDBC Connector is licensed under the Apache License, Version 2.0.