Salesforce PubSub API Source Connector
Type |
Source |
Class |
|
Target System |
Messaging & Streaming (Salesforce Pub/Sub API) |
Maintainer |
Axual |
License |
Proprietary (client-only) |
Project |
Proprietary. Source code is not publicly accessible. |
Download |
Contact Axual Support to obtain the connector library. |
|
This page documents version 1.1.0. Newer versions should be compatible unless there are breaking changes, but field names or default values may differ. If you notice discrepancies, please contact Axual Support. |
Older Versions
This page covers the latest release. Documentation for previous versions is archived and kept for reference.
Description
The Salesforce PubSub API Source Connector streams events from Salesforce to Kafka topics in real time. It uses the modern gRPC-based Salesforce Pub/Sub API, providing efficient, high-performance ingestion of Change Data Capture (CDC) events, Platform Events, and Real-Time Event Monitoring data. It is a robust alternative to older Streaming API (CometD) based connectors, offering better scalability and a unified eventing interface.
Features
-
Stream real-time events from Salesforce to Kafka
-
Supports multiple Salesforce event types:
-
Change Data Capture (CDC)
-
Platform Events
-
Real-Time Event Monitoring
-
-
Built on Salesforce Pub/Sub API (gRPC) for high throughput and low latency
-
Converts Salesforce Avro schemas to Kafka Connect schemas, enabling structured converters
-
Supports at-least-once delivery semantics
-
Two authentication flows: OAuth 2.0 Client Credentials and JWT Bearer
-
One task per Salesforce topic —
tasks.maxmust be ≥ the number of topics insalesforce.topics
When to Use
-
You need real-time streaming of Salesforce CDC events, Platform Events, or Real-Time Event Monitoring data into Kafka.
-
You require a high-throughput, low-latency integration with Salesforce using the modern Pub/Sub API.
-
You are replacing a CometD-based Salesforce connector and need better scalability.
When NOT to Use
-
You only need periodic batch exports from Salesforce — consider a REST-based integration instead.
-
Your Salesforce organisation does not support or has not enabled the Pub/Sub API.
Installation
The Salesforce PubSub API Source Connector is hosted in a private repository. To obtain the library, contact the Axual Support team.
For installation steps, see Installing Connector Plugins.
Configuration
Configuration options are grouped as follows:
-
Authentication — Salesforce OAuth 2.0 credentials and Pub/Sub API endpoint
-
Topic Routing and Replay — Salesforce topics to subscribe to, Kafka topic naming, and replay position
-
Performance Tuning — Internal buffers, batch sizes, and polling timeouts
-
Connection Reliability (gRPC) — gRPC keep-alive settings
-
Advanced Configuration — Schema caching and connection validation
Authentication
The connector supports two machine-to-machine authentication flows, selected by salesforce.auth.type.
Client Credentials Flow — uses a client_id and client_secret pair to obtain an access token.
Simpler to set up but requires storing a client secret.
Requires a Run As User assigned on the Connected App’s OAuth Policies page with the API Enabled system permission.
JWT Bearer Flow — signs a short-lived JWT with an RSA private key. The private key never leaves the Kafka Connect worker; only signed assertions are sent to Salesforce. Recommended for production environments. Requires an RSA key pair (2048-bit recommended), the certificate uploaded to the Connected App, and the integration user pre-authorized via their profile or permission set.
| Key | Type | Mandatory | Default | Description |
|---|---|---|---|---|
|
String |
Yes |
— |
Salesforce OAuth Client ID (Connected App Consumer Key). Required for both authentication flows. |
|
String |
Yes |
— |
Salesforce Instance URL, e.g. |
|
String |
Yes |
— |
Salesforce Organisation ID (Tenant ID). Typically starts with |
|
String |
Yes |
— |
Authentication flow to use. Valid values: |
|
Password |
Conditional |
— |
Salesforce OAuth Client Secret (Connected App Consumer Secret). Required when |
|
Password |
Conditional |
— |
PEM-encoded RSA private key in PKCS#8 format. Required when |
|
String |
Conditional |
— |
Salesforce username of the integration user (JWT |
|
String |
No |
JWT |
|
|
String |
No |
api.pubsub.salesforce.com:7443 |
Salesforce Pub/Sub API gRPC endpoint in |
Topic Routing and Replay
Determines which Salesforce data is ingested, how Kafka topic names are generated, and where to start reading.
| Key | Type | Mandatory | Default | Description |
|---|---|---|---|---|
|
List |
Yes |
— |
Comma-separated list of Salesforce topics to subscribe to.
Example: |
|
String |
Yes |
— |
Prefix for destination Kafka topic names.
Salesforce topic path slashes are replaced with dots and this prefix is prepended.
Example: prefix |
|
String |
No |
EARLIEST |
Starting position for new subscriptions (when no stored offset exists).
|
Performance Tuning
| Key | Type | Mandatory | Default | Description |
|---|---|---|---|---|
|
Integer |
No |
100 |
Maximum records returned per |
|
Integer |
No |
1000 |
Maximum events buffered in memory before pausing the gRPC consumer. Increase for high-volume topics; decrease for memory-constrained environments. |
|
Long |
No |
1000 |
Maximum time in milliseconds to wait for events during a |
Connection Reliability (gRPC)
Controls gRPC channel keep-alive behaviour to prevent firewalls or load balancers from dropping idle connections.
| Key | Type | Mandatory | Default | Description |
|---|---|---|---|---|
|
Integer |
No |
30 |
Interval in seconds between keep-alive pings sent to the server. |
|
Integer |
No |
10 |
Time in seconds to wait for a keep-alive ping response before treating the connection as dead. |
|
Integer |
No |
5 |
Time in seconds to wait for graceful channel shutdown before forcing termination. |
Advanced Configuration
| Key | Type | Mandatory | Default | Description |
|---|---|---|---|---|
|
Integer |
No |
100 |
Maximum number of Avro schemas to cache in memory. Increase if the environment uses a large number of distinct schema versions. |
|
Boolean |
No |
true |
Whether to validate Salesforce connectivity during connector startup.
Set to |
Getting Started
This section walks you through configuring the Salesforce PubSub API Source Connector on Axual to stream events from Salesforce into a local Kafka stream.
Prerequisites
Salesforce Connected App
The connector authenticates to Salesforce using a Connected App. Two authentication flows are supported — choose one.
Client Credentials Flow (simpler setup):
-
In Salesforce Setup, create a Connected App with OAuth enabled.
-
Enable Client Credentials Flow in the OAuth settings.
-
Grant the
apiandcdpqueryOAuth scopes (or as required by your event type). -
Assign a Run As User on the Connected App’s OAuth Policies page.
-
Note down the Consumer Key (
salesforce.client.id) and Consumer Secret (salesforce.client.secret).
JWT Bearer Flow (RFC 7523) (recommended for production):
-
Generate an RSA key pair and upload the X.509 certificate to the Connected App.
openssl genrsa -out keypair.pem 2048 openssl pkcs8 -topk8 -inform PEM -outform PEM -nocrypt -in keypair.pem -out private_key.pem openssl req -new -x509 -nodes -sha256 -days 365 -key keypair.pem -out certificate.crt -
Enable Use Digital Signatures in the Connected App OAuth settings and upload
certificate.crt. -
Grant the
apiandcdpqueryOAuth scopes (or as required by your event type). -
Pre-authorize the integration user via the Connected App’s Run As setting or a permission set.
-
Note down the Consumer Key (
salesforce.client.id) and storeprivate_key.pemin Vault assalesforce.jwt.private.key.
In both cases, note down your Salesforce Instance URL and Organisation ID (starts with 00D).
For detailed Salesforce Connected App setup, see the Salesforce Connected Apps documentation.
Salesforce event type
The Salesforce topic you want to stream must be enabled and accessible.
-
Platform Events: must be defined in Salesforce Setup → Platform Events.
-
Change Data Capture (CDC): must be enabled for the target object in Salesforce Setup → Change Data Capture.
-
Real-Time Event Monitoring: must be licensed and enabled in your Salesforce org.
The connector assigns one task per Salesforce topic. Ensure tasks.max ≥ the number of topics in salesforce.topics.
|
Axual stream
The local stream where the connector will produce events must already exist in Axual Self-Service.
The stream name will be derived from the Salesforce topic path and the configured topic.prefix.
See Creating streams if you need to create it.
Steps
Step 1 — Create a connector application
-
In Axual Self-Service, go to Applications and create a new application.
-
Request Producer access to the Axual stream the connector will write to.
-
Wait for the stream access request to be approved.
See Configure and install a connector for detailed steps.
Step 2 — Configure the connector
In Axual Self-Service, open the application and add a new connector. Use the following minimal configuration as a starting point for a single Platform Event topic. Replace all placeholder values with those of your environment.
| Property | Value |
|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Store salesforce.client.id and salesforce.client.secret in Vault before starting the connector.
See Configure and install a connector for instructions on adding Vault secrets.
|
For all available properties, see the Configuration section above.
Step 3 — Start the connector
Start the connector application from Axual Self-Service.
Once running, events from the configured Salesforce topic will be produced to the Axual stream named <topic.prefix><salesforce-topic-path> (slashes replaced with dots).
For example, topic prefix salesforce. and Salesforce topic /event/MyEvente produces to salesforce.event.MyEvente.
Known limitations
-
One connector task is assigned per Salesforce topic —
tasks.maxmust be ≥ the number of topics insalesforce.topics. -
The Salesforce Pub/Sub API must be enabled in your Salesforce organisation.
-
Event replay is subject to Salesforce retention windows — typically 3 days for Platform Events and CDC events.
-
Real-Time Event Monitoring requires a separate Salesforce license and must be enabled in your org.
Examples
The connector assigns one task per Salesforce topic. Ensure tasks.max ≥ the number of topics in salesforce.topics.
|
Example 1 — Basic single topic configuration
A basic configuration for a single Platform Event topic using default performance settings.
{
"connector.class": "io.axual.connect.plugins.salesforce.SalesforcePubSubSourceConnector",
"tasks.max": "1",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": true,
"salesforce.client.id": "3MVG94DAZekw5HctTg8hdQ3FK6...",
"salesforce.auth.type": "CLIENT_CREDENTIALS",
"salesforce.client.secret": "AF97355FC3615CF9FA4EB7BF...",
"salesforce.instance.url": "https://your-domain.my.salesforce.com",
"salesforce.tenant.id": "00Dd100000APUJi",
"salesforce.topics": "/event/MyEvent__e",
"topic.prefix": "salesforce."
}
Example 2 — Multi-topic high throughput
Ingests three CDC streams with tasks.max set to 3.
Batch size and queue capacity are increased to handle bursty traffic.
{
"connector.class": "io.axual.connect.plugins.salesforce.SalesforcePubSubSourceConnector",
"tasks.max": "3",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": true,
"salesforce.client.id": "3MVG94DAZekw5HctTg8hdQ3FK6...",
"salesforce.auth.type": "CLIENT_CREDENTIALS",
"salesforce.client.secret": "AF97355FC3615CF9FA4EB7BF...",
"salesforce.instance.url": "https://your-domain.my.salesforce.com",
"salesforce.tenant.id": "00Dd100000APUJi",
"salesforce.topics": "/data/AccountChangeEvent,/data/ContactChangeEvent,/data/OpportunityChangeEvent",
"topic.prefix": "salesforce.",
"salesforce.batch.size": "2000",
"salesforce.queue.capacity": "10000",
"salesforce.poll.timeout.ms": "5000",
"salesforce.replay.preset": "LATEST"
}
Example 3 — Schema-heavy environment
Increases salesforce.schema.cache.size to 500 for environments with many distinct schema versions,
reducing repeated schema parsing overhead.
{
"connector.class": "io.axual.connect.plugins.salesforce.SalesforcePubSubSourceConnector",
"tasks.max": "2",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": true,
"salesforce.client.id": "3MVG94DAZekw5HctTg8hdQ3FK6...",
"salesforce.auth.type": "CLIENT_CREDENTIALS",
"salesforce.client.secret": "AF97355FC3615CF9FA4EB7BF...",
"salesforce.instance.url": "https://your-domain.my.salesforce.com",
"salesforce.tenant.id": "00Dd100000APUJi",
"salesforce.topics": "/data/CaseChangeEvent,/data/AssetChangeEvent",
"topic.prefix": "salesforce.",
"salesforce.schema.cache.size": "500",
"salesforce.batch.size": "500",
"errors.tolerance": "all",
"errors.log.enable": "true"
}
Example 4 — Unstable network / firewall configuration
Configures gRPC keep-alive pings every 30 seconds to prevent load balancers from killing idle connections. Connection validation is explicitly enabled to fail fast on startup if Salesforce is unreachable.
{
"connector.class": "io.axual.connect.plugins.salesforce.SalesforcePubSubSourceConnector",
"tasks.max": "1",
"salesforce.client.id": "3MVG94DAZekw5HctTg8hdQ3FK6...",
"salesforce.auth.type": "CLIENT_CREDENTIALS",
"salesforce.client.secret": "AF97355FC3615CF9FA4EB7BF...",
"salesforce.instance.url": "https://your-domain.my.salesforce.com",
"salesforce.tenant.id": "00Dd100000APUJi",
"salesforce.topics": "/event/CriticalSysStatus__e",
"topic.prefix": "salesforce.",
"salesforce.validate.connection": "true",
"salesforce.grpc.keepalive.time.seconds": "30",
"salesforce.grpc.keepalive.timeout.seconds": "10",
"salesforce.grpc.shutdown.timeout.seconds": "2"
}
Example 5 — Historical replay (earliest)
Sets salesforce.replay.preset to EARLIEST to read all available historical events from the start of the
Salesforce retention window.
{
"connector.class": "io.axual.connect.plugins.salesforce.SalesforcePubSubSourceConnector",
"tasks.max": "1",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "true",
"salesforce.client.id": "3MVG94DAZekw5HctTg8hdQ3FK6...",
"salesforce.auth.type": "CLIENT_CREDENTIALS",
"salesforce.client.secret": "AF97355FC3615CF9FA4EB7BF...",
"salesforce.instance.url": "https://your-domain.my.salesforce.com",
"salesforce.tenant.id": "00Dd100000APUJi",
"salesforce.topics": "/event/AuditLog__e",
"topic.prefix": "salesforce.",
"salesforce.replay.preset": "EARLIEST"
}
Example 6 — JWT Bearer authentication
Uses JWT_BEARER flow instead of client credentials. The private key is stored in Vault and never transmitted to Salesforce — only a short-lived signed JWT assertion is exchanged for an access token.
{
"connector.class": "io.axual.connect.plugins.salesforce.SalesforcePubSubSourceConnector",
"tasks.max": "1",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": true,
"salesforce.client.id": "3MVG94DAZekw5HctTg8hdQ3FK6...",
"salesforce.auth.type": "JWT_BEARER",
"salesforce.jwt.private.key": "${keyvault:connectors/<tenant>/<instance>/<env>/<app>:salesforce.jwt.private.key}",
"salesforce.jwt.username": "integration@your-org.com",
"salesforce.instance.url": "https://your-domain.my.salesforce.com",
"salesforce.tenant.id": "00Dd100000APUJi",
"salesforce.topics": "/event/MyEvent__e",
"topic.prefix": "salesforce."
}