JDBC Source Connector — Snowflake
Type |
Source |
Class |
|
Target System |
Snowflake |
Maintainer |
Aiven |
License |
Apache License 2.0 |
Project |
|
Download |
|
This page documents version 6.12.0. Newer versions should be compatible unless there are breaking changes, but field names or default values may differ. If you notice discrepancies, please contact Axual Support. |
Description
The JDBC Source Connector for Snowflake reads rows from a Snowflake table and publishes them as records
to Kafka topics using the Aiven-Open JDBC Source Connector (io.aiven.connect.jdbc.JdbcSourceConnector).
There is no officially supported Snowflake Source Connector. This page describes a proven approach using the generic JDBC Source Connector with additional Snowflake-specific JDBC driver dependencies.
See also: Snowflake Sink Connector for writing Kafka records into Snowflake.
Features
-
Poll Snowflake tables and publish rows to Kafka topics
-
timestamp+incrementingmode for reliable offset tracking across high-throughput workloads -
JSON result format to avoid Apache Arrow memory allocation issues
-
UTC timezone handling to prevent offset comparison failures with
TIMESTAMP_NTZcolumns -
Configurable polling interval and batch size
When to Use
-
You need to stream rows from a Snowflake table into Kafka topics.
-
Near-real-time polling latency is acceptable for your use case.
-
Your source table has an auto-incrementing
idcolumn and a maintainedupdated_attimestamp column.
When NOT to Use
-
You need real-time change data capture with sub-second latency — Snowflake does not support CDC via JDBC.
-
Your source table lacks an auto-incrementing primary key or a maintained
updated_atcolumn — thetimestamp+incrementingmode requires both. -
You need to write data into Snowflake — use the Snowflake Sink Connector instead.
Installation
The JDBC Source Connector plugin is available from the Maven Central.
-
Navigate to the releases page and select the version matching your Kafka Connect installation.
-
Download the JAR file.
For installation steps, see Installing Connector Plugins.
Required additional drivers
The standard JDBC connector does not include the Snowflake JDBC driver or its cryptographic dependencies.
Without them, the connector fails with java.sql.SQLException: No suitable driver found for jdbc:snowflake://….
The following JAR files must be placed manually inside the JDBC connector’s plugin directory
(e.g. ./connect/plugins/aiven-jdbc-connector/):
| JAR file | Purpose | Notes |
|---|---|---|
Snowflake JDBC driver (FIPS variant) |
The FIPS driver bundles its own internal dependencies. The standard non-FIPS driver may also be used. |
|
Bouncy Castle security provider |
Required for cryptographic services. The |
|
Bouncy Castle PKIX/operator |
Required to prevent |
Configuration
For the complete configuration reference, see the {hyperlink-official-source-connector-documentation}.
| To configure a connector in Axual Self-Service, see Starting Connectors. TIP: To speed up your deployment, use the Terraform Boilerplate or the Management API Boilerplate. |
Key configuration parameters
Several parameters are critical for stable operation with Snowflake and are explained below.
Connection URL
The connection URL must include the following query parameters:
JDBC_QUERY_RESULT_FORMAT=JSON-
Forces the Snowflake JDBC driver to return results as JSON instead of Apache Arrow binary format. Arrow requires native memory allocation that often causes out-of-memory or
RootAllocatorerrors in Kafka Connect. JSON bypasses this entirely. TIMEZONE=UTC-
Tells the Snowflake driver to interpret all non-timezone-aware timestamps as UTC. This prevents offset comparison failures caused by timezone ambiguity in
TIMESTAMP_NTZcolumns.
Polling mode
mode=timestamp+incrementing uses two columns to track offset progress:
-
timestamp.column.name— tracks temporal progress; records older thantimestamp.delay.interval.msare eligible for polling. -
incrementing.column.name— acts as a tie-breaker when multiple rows share the same timestamp, ensuring no records are skipped.
Time precision
time.precision.mode=adaptive_time_storage is recommended for Snowflake. It instructs the connector to
pass the database’s native timestamp string directly into the SQL WHERE clause instead of converting
it to a Java millisecond epoch. This avoids precision loss: Snowflake supports nanosecond timestamps
while Java milliseconds have only 3-decimal precision. Any sub-millisecond truncation can cause the
connector to miss or repeat records at offset boundaries.
Source table requirements
For reliable operation, the source Snowflake table must follow these design principles:
| Column | Requirement | Rationale |
|---|---|---|
|
Unique, auto-incrementing numeric column |
Used as the tie-breaker in |
|
|
Tracks temporal progress. |
Getting Started
Prerequisites
Snowflake instance
You need an active Snowflake subscription with:
-
A dedicated service user and a role (
SF_ROLE) with SELECT access on the source table. -
A source table containing an auto-incrementing
idcolumn and anupdated_attimestamp column.
Additional driver JARs
Install the three required driver JARs into the JDBC connector plugin directory before starting the connector. See [_required_additional_drivers] above.
Axual stream
The stream where the connector will produce events must already exist in Axual Self-Service. See Creating streams if you need to create it.
Steps
Step 1 — Create a connector application
-
Follow the Configure and install a connector documentation to set up a new Connector-Application.
Let’s call it{application-name}.
The plugin name isio.aiven.connect.jdbc.JdbcSourceConnector.
If a plugin isn’t available, ask a platform operator to install plugins.
Step 2 — Configure the connector
-
Provide the following minimal configuration:
connector.classio.aiven.connect.jdbc.JdbcSourceConnectorconnection.urljdbc:snowflake://<account>.snowflakecomputing.com/?warehouse=<warehouse>&db=<database>&schema=<schema>&role=<role>&JDBC_QUERY_RESULT_FORMAT=JSON&TIMEZONE=UTCconnection.user<snowflake-username>connection.password<snowflake-password>modetimestamp+incrementingincrementing.column.nameidtimestamp.column.nameupdated_attime.precision.modeadaptive_time_storagequeryselect * from "<table-name>"topic.prefix<topic_prefix>For advanced options, see the {hyperlink-official-source-connector-documentation}.
Known limitations
-
There is no official Snowflake Source Connector. This setup relies on the generic JDBC Source Connector with manually installed additional drivers.
-
The source table must have a unique auto-incrementing
idcolumn and a maintainedupdated_attimestamp column. Tables without these columns are not supported. -
This connector uses polling — it does not provide real-time change detection. Polling latency is controlled by
table.poll.interval.ms. -
Schema evolution is not supported. Changes to the source table schema require manual reconfiguration.
-
The Apache Arrow result format is not supported in Kafka Connect environments.
JDBC_QUERY_RESULT_FORMAT=JSONmust always be set.
Examples
Minimal configuration
{
"name": "snowflake-jdbc-source",
"config": {
"connector.class": "io.aiven.connect.jdbc.JdbcSourceConnector",
"tasks.max": "1",
"connection.url": "jdbc:snowflake://myorg-sf7890.snowflakecomputing.com/?warehouse=COMPUTE_WH&db=ANALYTICS_DB&schema=PUBLIC&role=KAFKA_ROLE&JDBC_QUERY_RESULT_FORMAT=JSON&TIMEZONE=UTC",
"connection.user": "svc_kafka_connect",
"connection.password": "Snowflake@Connect1",
"mode": "timestamp+incrementing",
"incrementing.column.name": "id",
"timestamp.column.name": "updated_at",
"time.precision.mode": "adaptive_time_storage",
"query": "select * from \"EVENTS\"",
"topic.prefix": "snowflake_"
}
}
Full configuration
{
"name": "snowflake-jdbc-source",
"config": {
"connector.class": "io.aiven.connect.jdbc.JdbcSourceConnector",
"tasks.max": "1",
"connection.url": "jdbc:snowflake://myorg-sf7890.snowflakecomputing.com/?warehouse=COMPUTE_WH&db=ANALYTICS_DB&schema=PUBLIC&role=KAFKA_ROLE&JDBC_QUERY_RESULT_FORMAT=JSON&TIMEZONE=UTC",
"connection.user": "svc_kafka_connect",
"connection.password": "Snowflake@Connect1",
"connection.attempts": "100",
"connection.backoff.ms": "300000",
"mode": "timestamp+incrementing",
"incrementing.column.name": "id",
"timestamp.column.name": "updated_at",
"timestamp.delay.interval.ms": "5000",
"time.precision.mode": "adaptive_time_storage",
"query": "select * from \"EVENTS\"",
"table.types": "table",
"table.poll.interval.ms": "60000",
"batch.max.rows": "100",
"numeric.mapping": "best_fit",
"numeric.precision.mapping": "false",
"quote.sql.identifiers": "ALWAYS",
"db.timezone": "UTC",
"topic.prefix": "snowflake_",
"transforms": "createKey",
"transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
"transforms.createKey.fields": "id",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"header.converter": "org.apache.kafka.connect.storage.StringConverter"
}
}
License
Snowflake JDBC Connector is licensed under the Apache License, Version 2.0.