Salesforce SObject Sink Connector

Type

Sink

Class

io.axual.connect.plugins.salesforce.SalesforceSinkConnector

Target System

CRM (Salesforce SObject)

Maintainer

Axual

License

Proprietary (client-only)

Project

Source Code

Download

Contact Axual Support to obtain the connector library (private repository).

Description

The Salesforce SObject Sink Connector writes records from Kafka topics directly into Salesforce SObjects using the Salesforce REST API. It supports any standard or custom SObject (e.g. Contact, Account, MyObject__c) and four write operations: INSERT, UPDATE, DELETE, and UPSERT. Two machine-to-machine OAuth 2.0 authentication flows are supported: Client Credentials and JWT Bearer (RFC 7523).

Features

  • Write Kafka records to any standard or custom Salesforce SObject

  • Four write operations: INSERT, UPDATE, DELETE, and UPSERT

  • Two authentication modes: OAuth 2.0 Client Credentials and JWT Bearer (RFC 7523)

  • Idempotent writes via external ID fields (UPSERT)

  • Header-driven operation routing

  • Tombstone record support for DELETE operations

  • Automatic OAuth token refresh with a proactive two-minute buffer

  • Dead Letter Queue (DLQ) support for non-recoverable errors via Kafka Connect’s ErrantRecordReporter

  • Exponential backoff retry strategy for transient errors

  • Startup validation of OAuth connectivity and external ID field existence

  • API usage limit awareness with warnings at low quota thresholds

  • Optional gzip HTTP compression for large payloads

  • Logical type conversion for Date, Timestamp, Time, and Decimal Kafka fields

When to Use

  • You need to write Kafka records into Salesforce SObjects in real time (e.g. syncing orders, contacts, or custom objects).

  • You require idempotent writes using Salesforce External ID fields (UPSERT).

  • You are consuming a CDC or mixed-operation stream and need per-record operation routing via Kafka headers.

  • You want a secure, secret-free authentication setup using the JWT Bearer flow.

When NOT to Use

  • You need to read from Salesforce — consider the Salesforce PubSub API Source Connector instead.

  • Your topic has high message throughput. This connector issues one Salesforce REST API request per record for INSERT and UPSERT operations. UPDATE and DELETE in External ID mode require an additional GET request to resolve the native record ID, though an in-memory cache reduces this to one request for records with a previously seen external ID within the same task lifetime. Salesforce enforces a 24-hour rolling API request quota per organisation. A high-volume stream can exhaust this quota rapidly, blocking all API access for other systems in the same org until the quota resets. Do not use this connector for high-throughput workloads without first calculating your expected daily request volume against your org’s quota.

  • Your target SObject is not accessible to the configured integration user.

Support for the Salesforce Composite REST API (up to 200 records per request) and Bulk API 2.0 (designed for large data volumes) is planned for a future release. If you need high-throughput or bulk write support urgently, contact Axual Support to register your interest.

Installation

The Salesforce SObject Sink Connector is hosted in a private repository. To obtain the library, contact the Axual Support team.

For installation steps, see Installing Connector Plugins.

Configuration

Configuration options are grouped as follows:

Authentication

The connector supports two machine-to-machine OAuth 2.0 flows selected by salesforce.auth.type.

Client Credentials Flow — uses a client_id and client_secret pair to obtain an access token. Simpler to set up but requires storing a client secret. Requires a Run As User assigned on the Connected App’s OAuth Policies page with the API Enabled system permission.

JWT Bearer Flow (RFC 7523) — signs a short-lived JWT with an RSA private key. The private key never leaves the Kafka worker; only signed assertions are sent to Salesforce. Recommended for production environments. Requires an RSA key pair (2048-bit recommended), the certificate uploaded to the Connected App, and the integration user pre-authorized via their profile or permission set.

Key Type Mandatory Default Description

salesforce.instance.url

String

Yes

Base URL of the Salesforce instance, e.g. https://myorg.my.salesforce.com.

salesforce.client.id

String

Yes

Connected App Consumer Key (OAuth Client ID). Required for both authentication flows.

salesforce.auth.type

String

Yes

Authentication flow to use. Valid values: CLIENT_CREDENTIALS, JWT_BEARER.

salesforce.client.secret

Password

Conditional

Connected App Consumer Secret. Required when salesforce.auth.type is CLIENT_CREDENTIALS.

salesforce.jwt.private.key

Password

Conditional

PEM-encoded RSA private key in PKCS#8 format. Required when salesforce.auth.type is JWT_BEARER.

salesforce.jwt.username

String

Conditional

Salesforce username of the integration user. Used as the JWT sub claim. Required when salesforce.auth.type is JWT_BEARER.

salesforce.jwt.audience

String

No

https://login.salesforce.com

JWT aud claim. Use https://test.salesforce.com for sandbox environments.

Connection

Key Type Mandatory Default Description

salesforce.api.version

String

No

62.0

Salesforce REST API version to use.

salesforce.request.timeout.ms

Integer

No

30000

Per-request HTTP timeout in milliseconds. The TCP connect timeout is fixed at 30 seconds.

SObject Mapping

Key Type Mandatory Default Description

salesforce.sobject.type

String

Yes

API name of the target Salesforce SObject, e.g. Contact, Account, MyCustomObject__c.

salesforce.operation

String

No

INSERT

Default write operation applied to every record. Valid values: INSERT, UPDATE, DELETE, UPSERT.
Can be overridden per record via the operation Kafka header when salesforce.operation.header.enabled is true.

salesforce.external.id.field

String

No

Name of the Salesforce External ID field. When set, activates External ID mode and uses the Kafka record key as the external ID value.
The field must be marked as External ID in Salesforce. Required to use UPSERT.

salesforce.tombstone.delete

Boolean

No

false

When true, records with a null value (tombstones) are treated as DELETE operations. Requires a non-null Kafka record key.

salesforce.operation.header.enabled

Boolean

No

false

When true, reads the operation Kafka record header to override the static salesforce.operation value per record.
Useful for CDC streams that carry mixed INSERT, UPDATE, and DELETE events on a single topic.

The connector behaves differently depending on whether salesforce.external.id.field is configured.

Every record processed by this connector consumes at least one Salesforce REST API request. Salesforce enforces a 24-hour rolling API request quota per organisation. Monitor your quota carefully in high-throughput environments.

Native ID mode (no external ID field) — the Kafka record key is used as the native Salesforce record ID for UPDATE and DELETE:

Operation Behaviour

INSERT

Salesforce generates the record ID.

UPDATE

Kafka record key used as the native Salesforce record ID.

DELETE

Kafka record key used as the native Salesforce record ID.

UPSERT

Not supported in Native ID mode. Configure salesforce.external.id.field to enable UPSERT.

External ID mode (salesforce.external.id.field configured) — the Kafka record key is used as the external ID value:

Operation Behaviour

INSERT

Translated to UPSERT automatically for idempotency.

UPDATE

Resolves the native record ID from the external ID, then PATCH. The native ID is cached in memory (up to 10,000 entries, 7-day TTL), so repeated external IDs within the same task lifetime skip the GET lookup overhead.

DELETE

Resolves the native record ID from the external ID, then DELETE. A 404 on lookup is silently skipped (idempotent). The native ID is cached in the same way as UPDATE.

UPSERT

Creates the record if no match is found, or updates it in place if a record with that external ID already exists. The Kafka record key is used as the external ID value. This is the most efficient and idempotent write operation in External ID mode: it requires only one API request and is safe to retry.

Error Handling and Retries

Key Type Mandatory Default Description

salesforce.retry.max.attempts

Integer

No

3

Maximum in-connector retry attempts for transient errors (rate limits, row locks, network timeouts).
Valid range: 0–10. Set to 0 to disable in-connector retries and rely solely on the Kafka Connect framework.

salesforce.retry.initial.backoff.ms

Integer

No

1000

Initial backoff delay before the first retry in milliseconds. Subsequent retries use exponential backoff up to salesforce.retry.max.backoff.ms.

salesforce.retry.max.backoff.ms

Integer

No

30000

Maximum backoff cap in milliseconds. Retry delays do not exceed this value.

Recoverable errors are retried by the connector and, when exhausted, thrown as RetriableException for the Kafka Connect framework to retry:

  • HTTP 5xx server errors

  • HTTP 429 / HTTP 503 CONCURRENT_REQUESTS_LIMIT_EXCEEDED (rate limits, row locks)

  • Network timeouts and connection failures

Non-recoverable errors are routed to the Dead Letter Queue if configured; processing continues with the next record:

  • HTTP 4xx client errors (malformed data, invalid field names, permission issues)

  • Record not found (404) on DELETE in Native ID mode

  • Daily API limit exhausted (REQUEST_LIMIT_EXCEEDED)

  • Invalid or missing record key for UPDATE, DELETE, or UPSERT

  • Unsupported record value type (e.g. plain String or byte array)

  • Unrecognized operation value in Kafka header

  • Null key on tombstone DELETE

To enable the Dead Letter Queue, add the following properties to the connector configuration:

Property Value

errors.tolerance

all

errors.deadletterqueue.topic.name

<your-dlq-topic-name>

errors.deadletterqueue.topic.replication.factor

1

errors.deadletterqueue.context.headers.enable

true

Advanced

Key Type Mandatory Default Description

salesforce.validate.connection

Boolean

No

true

Validates OAuth connectivity and (if configured) the existence of the external ID field on startup.
Set to false only for debugging.

salesforce.check.api.limits.on.start

Boolean

No

true

When true, calls the Salesforce /limits endpoint once during startup to log the remaining 24-hour API quota and the minimum safe request interval.
Emits a WARN when remaining quota drops below 20% and an ERROR below 10%.
This check is non-fatal. If the endpoint is unreachable the connector logs a warning and continues to start.
Set to false if the Connected App lacks permission to call the limits endpoint.

salesforce.http.compression.enabled

Boolean

No

false

Enables gzip compression for HTTP request and response bodies.
Useful for reducing network bandwidth with large payloads.

The record value must be a Struct (from schema-based converters) or a Map (from schemaless JSON). Plain String or byte array values are not supported and will be routed to the DLQ.

Converter Class Supported

Avro (Confluent)

io.confluent.connect.avro.AvroConverter

JSON Schema (Confluent)

io.confluent.connect.json.JsonSchemaConverter

Protobuf (Confluent)

io.confluent.connect.protobuf.ProtobufConverter

JSON with schema enabled

org.apache.kafka.connect.json.JsonConverter (schemas.enable=true)

JSON schemaless

org.apache.kafka.connect.json.JsonConverter (schemas.enable=false)

String

org.apache.kafka.connect.storage.StringConverter

Getting Started

This section walks you through configuring the Salesforce SObject Sink Connector on Axual to write records from a Kafka stream into a Salesforce SObject.

Prerequisites

Salesforce Connected App

The connector authenticates to Salesforce using OAuth 2.0 with a Connected App.

  1. In Salesforce Setup, go to App Manager and create a new Connected App (or External Client App).

  2. Under API (Enable OAuth Settings), enable OAuth and configure the appropriate flow:

    • Client Credentials: enable Enable Client Credentials Flow, then assign a Run As User on the app’s OAuth Policies page. That user must have the API Enabled system permission.

    • JWT Bearer: enable Use digital signatures, generate an RSA key pair (2048-bit recommended), and upload the certificate. Pre-authorize the integration user’s profile or permission set under Manage Connected Apps → OAuth Policies.

  3. Set the following OAuth scope: Manage user data via APIs (api).

  4. Note down the Consumer Key (salesforce.client.id) and, for Client Credentials, the Consumer Secret (salesforce.client.secret).

  5. Note down your Salesforce Instance URL.

For detailed Salesforce Connected App setup, see the Salesforce Connected Apps documentation.

Salesforce SObject permissions

  1. Grant the integration user Read, Create, Edit, and Delete object permissions on the target SObject.

  2. Verify Field-Level Security is configured so the integration user can read and write all fields you intend to sync.

  3. If using External ID mode, ensure the external ID field is marked as External ID in the SObject field definition.

Axual stream

The stream the connector will consume must already exist in Axual Self-Service. See Creating streams if you need to create it.

Steps

Step 1 — Create a connector application

  1. In Axual Self-Service, go to Applications and create a new application.

  2. Request Consumer access to the stream you want to write to Salesforce.

  3. Wait for the stream access request to be approved.

See Configure and install a connector for detailed steps.

Step 2 — Configure the connector

In Axual Self-Service, open the application and add a new connector. Use the following minimal configuration as a starting point for writing contacts using the JWT Bearer flow. Replace all placeholder values with those of your environment.

Property Value

connector.class

io.axual.connect.plugins.salesforce.SalesforceSinkConnector

tasks.max

1

salesforce.instance.url

https://your-domain.my.salesforce.com

salesforce.client.id

${keyvault:connectors/<tenant>/<instance>/<env>/<app>:salesforce.client.id}

salesforce.auth.type

JWT_BEARER

salesforce.jwt.private.key

${keyvault:connectors/<tenant>/<instance>/<env>/<app>:salesforce.jwt.private.key}

salesforce.jwt.username

integration-user@myorg.com

salesforce.sobject.type

Contact

salesforce.operation

INSERT

key.converter

org.apache.kafka.connect.storage.StringConverter

value.converter

io.confluent.connect.avro.AvroConverter

value.converter.schema.registry.url

<schema-registry-url>

Store salesforce.client.id and salesforce.jwt.private.key in Vault before starting the connector. See Configure and install a connector for instructions on adding Vault secrets.

For all available properties, see the Configuration section above.

Step 3 — Start the connector

Start the connector application from Axual Self-Service. Once running, records consumed from the stream will be written to the configured Salesforce SObject.

Step 4 — Verify

In Salesforce, open the target SObject’s list view to confirm that records are being created or updated. If a Dead Letter Queue is configured, check the DLQ topic in Axual Self-Service for any rejected records.

Cleanup

When you are done testing:

  1. Stop the connector application in Axual Self-Service.

  2. Remove stream access for the application if it is no longer needed.

Known limitations

  • API request consumption. Every record consumes at least one Salesforce REST API request. UPDATE and DELETE in External ID mode require an additional GET request to resolve the native record ID; however, an in-memory cache (10,000 entries, 7-day TTL per task) avoids the GET for external IDs already seen within the same task lifetime. The cache is not persisted across task restarts. Salesforce enforces a 24-hour rolling API request quota per organisation — this connector is not suitable for high-throughput streams without first verifying available quota. Support for the Salesforce Composite REST API (up to 200 records per request) and Bulk API 2.0 is planned for a future release. Contact Axual Support to register your interest if you need this sooner.

  • UPSERT is not supported in Native ID mode — configure salesforce.external.id.field to enable UPSERT.

  • Struct-type Kafka record keys are not supported for UPDATE, DELETE, and UPSERT — use a String or numeric key holding the Salesforce record ID or external ID value.

  • Field-level permissions are enforced by Salesforce — fields the integration user cannot write to will result in errors routed to the DLQ.

  • Only Kafka Connect 2.6 or later is supported (required for ErrantRecordReporter / DLQ support).

Examples

Example 1 — Minimal INSERT with Client Credentials authentication

A basic configuration writing records from a Kafka topic to the Contact SObject using INSERT. Authentication uses the OAuth 2.0 Client Credentials flow with a client ID and secret.

{
  "connector.class": "io.axual.connect.plugins.salesforce.SalesforceSinkConnector",
  "tasks.max": "1",
  "salesforce.instance.url": "https://myorg.my.salesforce.com",
  "salesforce.client.id": "3MVG94DAZekw5HctTg8hdQ3FK6...",
  "salesforce.auth.type": "CLIENT_CREDENTIALS",
  "salesforce.client.secret": "AF97355FC3615CF9FA4EB7BF...",
  "salesforce.sobject.type": "Contact",
  "salesforce.operation": "INSERT",
  "key.converter": "org.apache.kafka.connect.storage.StringConverter",
  "value.converter": "io.confluent.connect.avro.AvroConverter",
  "value.converter.schema.registry.url": "https://schema-registry:8081"
}

Example 2 — Minimal INSERT with JWT Bearer authentication

Same as Example 1 but uses the more secure OAuth 2.0 JWT Bearer flow. An RSA private key signs a short-lived JWT assertion; no client secret is stored or transmitted.

{
  "connector.class": "io.axual.connect.plugins.salesforce.SalesforceSinkConnector",
  "tasks.max": "1",
  "salesforce.instance.url": "https://myorg.my.salesforce.com",
  "salesforce.client.id": "3MVG94DAZekw5HctTg8hdQ3FK6...",
  "salesforce.auth.type": "JWT_BEARER",
  "salesforce.jwt.private.key": "-----BEGIN PRIVATE KEY-----\nMIIEvgIBADANBgkqhkiG9...\n-----END PRIVATE KEY-----",
  "salesforce.jwt.username": "integration-user@myorg.com",
  "salesforce.sobject.type": "Contact",
  "salesforce.operation": "INSERT",
  "key.converter": "org.apache.kafka.connect.storage.StringConverter",
  "value.converter": "io.confluent.connect.avro.AvroConverter",
  "value.converter.schema.registry.url": "https://schema-registry:8081"
}

Example 3 — UPSERT with External ID field

Uses UPSERT to write records to a custom SObject MyObjectc using the external ID field ExternalIdc. Records are matched by the Kafka record key and either created or updated idempotently. A Dead Letter Queue is configured to capture non-recoverable errors.

{
  "connector.class": "io.axual.connect.plugins.salesforce.SalesforceSinkConnector",
  "tasks.max": "1",
  "salesforce.instance.url": "https://myorg.my.salesforce.com",
  "salesforce.client.id": "3MVG94DAZekw5HctTg8hdQ3FK6...",
  "salesforce.auth.type": "JWT_BEARER",
  "salesforce.jwt.private.key": "-----BEGIN PRIVATE KEY-----\nMIIEvgIBADANBgkqhkiG9...\n-----END PRIVATE KEY-----",
  "salesforce.jwt.username": "integration-user@myorg.com",
  "salesforce.sobject.type": "MyObject__c",
  "salesforce.operation": "UPSERT",
  "salesforce.external.id.field": "ExternalId__c",
  "salesforce.retry.max.attempts": "5",
  "salesforce.retry.initial.backoff.ms": "1000",
  "salesforce.retry.max.backoff.ms": "30000",
  "errors.tolerance": "all",
  "errors.deadletterqueue.topic.name": "salesforce-sobject-sink-dlq",
  "errors.deadletterqueue.topic.replication.factor": "1",
  "errors.deadletterqueue.context.headers.enable": "true",
  "key.converter": "org.apache.kafka.connect.storage.StringConverter",
  "value.converter": "io.confluent.connect.avro.AvroConverter",
  "value.converter.schema.registry.url": "https://schema-registry:8081"
}

Example 4 — CDC stream with header-driven operation routing and DLQ

Consumes a Change Data Capture (CDC) stream where each record carries an operation header (INSERT, UPDATE, or DELETE). The connector reads the header to route each record to the correct Salesforce write operation. External ID mode is enabled for idempotent writes. Tombstone records trigger DELETE operations. A Dead Letter Queue handles any non-recoverable errors.

{
  "connector.class": "io.axual.connect.plugins.salesforce.SalesforceSinkConnector",
  "tasks.max": "1",
  "salesforce.instance.url": "https://myorg.my.salesforce.com",
  "salesforce.client.id": "3MVG94DAZekw5HctTg8hdQ3FK6...",
  "salesforce.auth.type": "JWT_BEARER",
  "salesforce.jwt.private.key": "-----BEGIN PRIVATE KEY-----\nMIIEvgIBADANBgkqhkiG9...\n-----END PRIVATE KEY-----",
  "salesforce.jwt.username": "integration-user@myorg.com",
  "salesforce.sobject.type": "Contact",
  "salesforce.operation": "UPSERT",
  "salesforce.external.id.field": "ExternalId__c",
  "salesforce.operation.header.enabled": "true",
  "salesforce.tombstone.delete": "true",
  "salesforce.retry.max.attempts": "3",
  "salesforce.retry.initial.backoff.ms": "1000",
  "salesforce.retry.max.backoff.ms": "30000",
  "errors.tolerance": "all",
  "errors.deadletterqueue.topic.name": "salesforce-sobject-sink-dlq",
  "errors.deadletterqueue.topic.replication.factor": "1",
  "errors.deadletterqueue.context.headers.enable": "true",
  "key.converter": "org.apache.kafka.connect.storage.StringConverter",
  "value.converter": "io.confluent.connect.avro.AvroConverter",
  "value.converter.schema.registry.url": "https://schema-registry:8081"
}

License

This connector is licensed under a proprietary client-only license. It is provided exclusively to authorized clients and may not be redistributed, modified, or used outside of the terms agreed with Axual.