Salesforce SObject Sink Connector
Type |
Sink |
Class |
|
Target System |
CRM (Salesforce SObject) |
Maintainer |
Axual |
License |
Proprietary (client-only) |
Project |
|
Download |
Contact Axual Support to obtain the connector library (private repository). |
Description
The Salesforce SObject Sink Connector writes records from Kafka topics directly into Salesforce SObjects using the Salesforce REST API.
It supports any standard or custom SObject (e.g. Contact, Account, MyObject__c) and four write operations: INSERT, UPDATE, DELETE, and UPSERT.
Two machine-to-machine OAuth 2.0 authentication flows are supported: Client Credentials and JWT Bearer (RFC 7523).
Features
-
Write Kafka records to any standard or custom Salesforce SObject
-
Four write operations: INSERT, UPDATE, DELETE, and UPSERT
-
Two authentication modes: OAuth 2.0 Client Credentials and JWT Bearer (RFC 7523)
-
Idempotent writes via external ID fields (UPSERT)
-
Header-driven operation routing
-
Tombstone record support for DELETE operations
-
Automatic OAuth token refresh with a proactive two-minute buffer
-
Dead Letter Queue (DLQ) support for non-recoverable errors via Kafka Connect’s
ErrantRecordReporter -
Exponential backoff retry strategy for transient errors
-
Startup validation of OAuth connectivity and external ID field existence
-
API usage limit awareness with warnings at low quota thresholds
-
Optional gzip HTTP compression for large payloads
-
Logical type conversion for Date, Timestamp, Time, and Decimal Kafka fields
When to Use
-
You need to write Kafka records into Salesforce SObjects in real time (e.g. syncing orders, contacts, or custom objects).
-
You require idempotent writes using Salesforce External ID fields (UPSERT).
-
You are consuming a CDC or mixed-operation stream and need per-record operation routing via Kafka headers.
-
You want a secure, secret-free authentication setup using the JWT Bearer flow.
When NOT to Use
-
You need to read from Salesforce — consider the Salesforce PubSub API Source Connector instead.
-
Your topic has high message throughput. This connector issues one Salesforce REST API request per record for INSERT and UPSERT operations. UPDATE and DELETE in External ID mode require an additional GET request to resolve the native record ID, though an in-memory cache reduces this to one request for records with a previously seen external ID within the same task lifetime. Salesforce enforces a 24-hour rolling API request quota per organisation. A high-volume stream can exhaust this quota rapidly, blocking all API access for other systems in the same org until the quota resets. Do not use this connector for high-throughput workloads without first calculating your expected daily request volume against your org’s quota.
-
Your target SObject is not accessible to the configured integration user.
| Support for the Salesforce Composite REST API (up to 200 records per request) and Bulk API 2.0 (designed for large data volumes) is planned for a future release. If you need high-throughput or bulk write support urgently, contact Axual Support to register your interest. |
Installation
The Salesforce SObject Sink Connector is hosted in a private repository. To obtain the library, contact the Axual Support team.
For installation steps, see Installing Connector Plugins.
Configuration
Configuration options are grouped as follows:
-
Authentication — OAuth 2.0 authentication flow and credentials
-
Connection — Salesforce instance URL and API version
-
SObject Mapping — Target SObject, write operation, and external ID settings
-
Error Handling and Retries — Retry behaviour and Dead Letter Queue configuration
-
Advanced — Startup validation, compression, and delivery tuning
Authentication
The connector supports two machine-to-machine OAuth 2.0 flows selected by salesforce.auth.type.
Client Credentials Flow — uses a client_id and client_secret pair to obtain an access token.
Simpler to set up but requires storing a client secret.
Requires a Run As User assigned on the Connected App’s OAuth Policies page with the API Enabled system permission.
JWT Bearer Flow (RFC 7523) — signs a short-lived JWT with an RSA private key. The private key never leaves the Kafka worker; only signed assertions are sent to Salesforce. Recommended for production environments. Requires an RSA key pair (2048-bit recommended), the certificate uploaded to the Connected App, and the integration user pre-authorized via their profile or permission set.
| Key | Type | Mandatory | Default | Description |
|---|---|---|---|---|
|
String |
Yes |
— |
Base URL of the Salesforce instance, e.g. |
|
String |
Yes |
— |
Connected App Consumer Key (OAuth Client ID). Required for both authentication flows. |
|
String |
Yes |
— |
Authentication flow to use. Valid values: |
|
Password |
Conditional |
— |
Connected App Consumer Secret. Required when |
|
Password |
Conditional |
— |
PEM-encoded RSA private key in PKCS#8 format. Required when |
|
String |
Conditional |
— |
Salesforce username of the integration user. Used as the JWT |
|
String |
No |
JWT |
Connection
| Key | Type | Mandatory | Default | Description |
|---|---|---|---|---|
|
String |
No |
62.0 |
Salesforce REST API version to use. |
|
Integer |
No |
30000 |
Per-request HTTP timeout in milliseconds. The TCP connect timeout is fixed at 30 seconds. |
SObject Mapping
| Key | Type | Mandatory | Default | Description |
|---|---|---|---|---|
|
String |
Yes |
— |
API name of the target Salesforce SObject, e.g. |
|
String |
No |
INSERT |
Default write operation applied to every record. Valid values: |
|
String |
No |
— |
Name of the Salesforce External ID field. When set, activates External ID mode and uses the Kafka record key as the external ID value. |
|
Boolean |
No |
false |
When |
|
Boolean |
No |
false |
When |
The connector behaves differently depending on whether salesforce.external.id.field is configured.
| Every record processed by this connector consumes at least one Salesforce REST API request. Salesforce enforces a 24-hour rolling API request quota per organisation. Monitor your quota carefully in high-throughput environments. |
Native ID mode (no external ID field) — the Kafka record key is used as the native Salesforce record ID for UPDATE and DELETE:
| Operation | Behaviour |
|---|---|
INSERT |
Salesforce generates the record ID. |
UPDATE |
Kafka record key used as the native Salesforce record ID. |
DELETE |
Kafka record key used as the native Salesforce record ID. |
UPSERT |
Not supported in Native ID mode. Configure |
External ID mode (salesforce.external.id.field configured) — the Kafka record key is used as the external ID value:
| Operation | Behaviour |
|---|---|
INSERT |
Translated to UPSERT automatically for idempotency. |
UPDATE |
Resolves the native record ID from the external ID, then |
DELETE |
Resolves the native record ID from the external ID, then |
UPSERT |
Creates the record if no match is found, or updates it in place if a record with that external ID already exists. The Kafka record key is used as the external ID value. This is the most efficient and idempotent write operation in External ID mode: it requires only one API request and is safe to retry. |
Error Handling and Retries
| Key | Type | Mandatory | Default | Description |
|---|---|---|---|---|
|
Integer |
No |
3 |
Maximum in-connector retry attempts for transient errors (rate limits, row locks, network timeouts). |
|
Integer |
No |
1000 |
Initial backoff delay before the first retry in milliseconds. Subsequent retries use exponential backoff up to |
|
Integer |
No |
30000 |
Maximum backoff cap in milliseconds. Retry delays do not exceed this value. |
Recoverable errors are retried by the connector and, when exhausted, thrown as RetriableException for the Kafka Connect framework to retry:
-
HTTP 5xx server errors
-
HTTP 429 / HTTP 503
CONCURRENT_REQUESTS_LIMIT_EXCEEDED(rate limits, row locks) -
Network timeouts and connection failures
Non-recoverable errors are routed to the Dead Letter Queue if configured; processing continues with the next record:
-
HTTP 4xx client errors (malformed data, invalid field names, permission issues)
-
Record not found (404) on DELETE in Native ID mode
-
Daily API limit exhausted (
REQUEST_LIMIT_EXCEEDED) -
Invalid or missing record key for UPDATE, DELETE, or UPSERT
-
Unsupported record value type (e.g. plain String or byte array)
-
Unrecognized operation value in Kafka header
-
Null key on tombstone DELETE
To enable the Dead Letter Queue, add the following properties to the connector configuration:
| Property | Value |
|---|---|
|
|
|
|
|
|
|
|
Advanced
| Key | Type | Mandatory | Default | Description |
|---|---|---|---|---|
|
Boolean |
No |
true |
Validates OAuth connectivity and (if configured) the existence of the external ID field on startup. |
|
Boolean |
No |
true |
When |
|
Boolean |
No |
false |
Enables gzip compression for HTTP request and response bodies. |
The record value must be a Struct (from schema-based converters) or a Map (from schemaless JSON).
Plain String or byte array values are not supported and will be routed to the DLQ.
| Converter | Class | Supported |
|---|---|---|
Avro (Confluent) |
|
✓ |
JSON Schema (Confluent) |
|
✓ |
Protobuf (Confluent) |
|
✓ |
JSON with schema enabled |
|
✓ |
JSON schemaless |
|
✓ |
String |
|
✗ |
Getting Started
This section walks you through configuring the Salesforce SObject Sink Connector on Axual to write records from a Kafka stream into a Salesforce SObject.
Prerequisites
Salesforce Connected App
The connector authenticates to Salesforce using OAuth 2.0 with a Connected App.
-
In Salesforce Setup, go to App Manager and create a new Connected App (or External Client App).
-
Under API (Enable OAuth Settings), enable OAuth and configure the appropriate flow:
-
Client Credentials: enable Enable Client Credentials Flow, then assign a Run As User on the app’s OAuth Policies page. That user must have the API Enabled system permission.
-
JWT Bearer: enable Use digital signatures, generate an RSA key pair (2048-bit recommended), and upload the certificate. Pre-authorize the integration user’s profile or permission set under Manage Connected Apps → OAuth Policies.
-
-
Set the following OAuth scope:
Manage user data via APIs (api). -
Note down the Consumer Key (
salesforce.client.id) and, for Client Credentials, the Consumer Secret (salesforce.client.secret). -
Note down your Salesforce Instance URL.
For detailed Salesforce Connected App setup, see the Salesforce Connected Apps documentation.
Salesforce SObject permissions
-
Grant the integration user Read, Create, Edit, and Delete object permissions on the target SObject.
-
Verify Field-Level Security is configured so the integration user can read and write all fields you intend to sync.
-
If using External ID mode, ensure the external ID field is marked as External ID in the SObject field definition.
Axual stream
The stream the connector will consume must already exist in Axual Self-Service. See Creating streams if you need to create it.
Steps
Step 1 — Create a connector application
-
In Axual Self-Service, go to Applications and create a new application.
-
Request Consumer access to the stream you want to write to Salesforce.
-
Wait for the stream access request to be approved.
See Configure and install a connector for detailed steps.
Step 2 — Configure the connector
In Axual Self-Service, open the application and add a new connector. Use the following minimal configuration as a starting point for writing contacts using the JWT Bearer flow. Replace all placeholder values with those of your environment.
| Property | Value |
|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Store salesforce.client.id and salesforce.jwt.private.key in Vault before starting the connector.
See Configure and install a connector for instructions on adding Vault secrets.
|
For all available properties, see the Configuration section above.
Step 3 — Start the connector
Start the connector application from Axual Self-Service. Once running, records consumed from the stream will be written to the configured Salesforce SObject.
Known limitations
-
API request consumption. Every record consumes at least one Salesforce REST API request. UPDATE and DELETE in External ID mode require an additional GET request to resolve the native record ID; however, an in-memory cache (10,000 entries, 7-day TTL per task) avoids the GET for external IDs already seen within the same task lifetime. The cache is not persisted across task restarts. Salesforce enforces a 24-hour rolling API request quota per organisation — this connector is not suitable for high-throughput streams without first verifying available quota. Support for the Salesforce Composite REST API (up to 200 records per request) and Bulk API 2.0 is planned for a future release. Contact Axual Support to register your interest if you need this sooner.
-
UPSERT is not supported in Native ID mode — configure
salesforce.external.id.fieldto enable UPSERT. -
Struct-type Kafka record keys are not supported for UPDATE, DELETE, and UPSERT — use a String or numeric key holding the Salesforce record ID or external ID value.
-
Field-level permissions are enforced by Salesforce — fields the integration user cannot write to will result in errors routed to the DLQ.
-
Only Kafka Connect 2.6 or later is supported (required for
ErrantRecordReporter/ DLQ support).
Examples
Example 1 — Minimal INSERT with Client Credentials authentication
A basic configuration writing records from a Kafka topic to the Contact SObject using INSERT.
Authentication uses the OAuth 2.0 Client Credentials flow with a client ID and secret.
{
"connector.class": "io.axual.connect.plugins.salesforce.SalesforceSinkConnector",
"tasks.max": "1",
"salesforce.instance.url": "https://myorg.my.salesforce.com",
"salesforce.client.id": "3MVG94DAZekw5HctTg8hdQ3FK6...",
"salesforce.auth.type": "CLIENT_CREDENTIALS",
"salesforce.client.secret": "AF97355FC3615CF9FA4EB7BF...",
"salesforce.sobject.type": "Contact",
"salesforce.operation": "INSERT",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "https://schema-registry:8081"
}
Example 2 — Minimal INSERT with JWT Bearer authentication
Same as Example 1 but uses the more secure OAuth 2.0 JWT Bearer flow. An RSA private key signs a short-lived JWT assertion; no client secret is stored or transmitted.
{
"connector.class": "io.axual.connect.plugins.salesforce.SalesforceSinkConnector",
"tasks.max": "1",
"salesforce.instance.url": "https://myorg.my.salesforce.com",
"salesforce.client.id": "3MVG94DAZekw5HctTg8hdQ3FK6...",
"salesforce.auth.type": "JWT_BEARER",
"salesforce.jwt.private.key": "-----BEGIN PRIVATE KEY-----\nMIIEvgIBADANBgkqhkiG9...\n-----END PRIVATE KEY-----",
"salesforce.jwt.username": "integration-user@myorg.com",
"salesforce.sobject.type": "Contact",
"salesforce.operation": "INSERT",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "https://schema-registry:8081"
}
Example 3 — UPSERT with External ID field
Uses UPSERT to write records to a custom SObject MyObjectc using the external ID field ExternalIdc.
Records are matched by the Kafka record key and either created or updated idempotently.
A Dead Letter Queue is configured to capture non-recoverable errors.
{
"connector.class": "io.axual.connect.plugins.salesforce.SalesforceSinkConnector",
"tasks.max": "1",
"salesforce.instance.url": "https://myorg.my.salesforce.com",
"salesforce.client.id": "3MVG94DAZekw5HctTg8hdQ3FK6...",
"salesforce.auth.type": "JWT_BEARER",
"salesforce.jwt.private.key": "-----BEGIN PRIVATE KEY-----\nMIIEvgIBADANBgkqhkiG9...\n-----END PRIVATE KEY-----",
"salesforce.jwt.username": "integration-user@myorg.com",
"salesforce.sobject.type": "MyObject__c",
"salesforce.operation": "UPSERT",
"salesforce.external.id.field": "ExternalId__c",
"salesforce.retry.max.attempts": "5",
"salesforce.retry.initial.backoff.ms": "1000",
"salesforce.retry.max.backoff.ms": "30000",
"errors.tolerance": "all",
"errors.deadletterqueue.topic.name": "salesforce-sobject-sink-dlq",
"errors.deadletterqueue.topic.replication.factor": "1",
"errors.deadletterqueue.context.headers.enable": "true",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "https://schema-registry:8081"
}
Example 4 — CDC stream with header-driven operation routing and DLQ
Consumes a Change Data Capture (CDC) stream where each record carries an operation header (INSERT, UPDATE, or DELETE).
The connector reads the header to route each record to the correct Salesforce write operation.
External ID mode is enabled for idempotent writes.
Tombstone records trigger DELETE operations.
A Dead Letter Queue handles any non-recoverable errors.
{
"connector.class": "io.axual.connect.plugins.salesforce.SalesforceSinkConnector",
"tasks.max": "1",
"salesforce.instance.url": "https://myorg.my.salesforce.com",
"salesforce.client.id": "3MVG94DAZekw5HctTg8hdQ3FK6...",
"salesforce.auth.type": "JWT_BEARER",
"salesforce.jwt.private.key": "-----BEGIN PRIVATE KEY-----\nMIIEvgIBADANBgkqhkiG9...\n-----END PRIVATE KEY-----",
"salesforce.jwt.username": "integration-user@myorg.com",
"salesforce.sobject.type": "Contact",
"salesforce.operation": "UPSERT",
"salesforce.external.id.field": "ExternalId__c",
"salesforce.operation.header.enabled": "true",
"salesforce.tombstone.delete": "true",
"salesforce.retry.max.attempts": "3",
"salesforce.retry.initial.backoff.ms": "1000",
"salesforce.retry.max.backoff.ms": "30000",
"errors.tolerance": "all",
"errors.deadletterqueue.topic.name": "salesforce-sobject-sink-dlq",
"errors.deadletterqueue.topic.replication.factor": "1",
"errors.deadletterqueue.context.headers.enable": "true",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "https://schema-registry:8081"
}