Salesforce PubSub API Source Connector
Type |
Source |
Class |
|
Target System |
Messaging & Streaming (Salesforce Pub/Sub API) |
Maintainer |
Axual |
License |
Proprietary (client-only) |
Project |
|
Download |
Contact Axual Support to obtain the connector library (private repository). |
Description
The Salesforce PubSub API Source Connector streams events from Salesforce to Kafka topics in real time. It uses the modern gRPC-based Salesforce Pub/Sub API, providing efficient, high-performance ingestion of Change Data Capture (CDC) events, Platform Events, and Real-Time Event Monitoring data. It is a robust alternative to older Streaming API (CometD) based connectors, offering better scalability and a unified eventing interface.
Features
-
Stream real-time events from Salesforce to Kafka
-
Supports multiple Salesforce event types:
-
Change Data Capture (CDC)
-
Platform Events
-
Real-Time Event Monitoring
-
-
Built on Salesforce Pub/Sub API (gRPC) for high throughput and low latency
-
Converts Salesforce Avro schemas to Kafka Connect schemas, enabling structured converters
-
Supports at-least-once delivery semantics
-
Secure authentication using OAuth 2.0 with Connected App client ID and client secret
-
One task per Salesforce topic —
tasks.maxmust be ≥ the number of topics insalesforce.topics
When to Use
-
You need real-time streaming of Salesforce CDC events, Platform Events, or Real-Time Event Monitoring data into Kafka.
-
You require a high-throughput, low-latency integration with Salesforce using the modern Pub/Sub API.
-
You are replacing a CometD-based Salesforce connector and need better scalability.
When NOT to Use
-
You only need periodic batch exports from Salesforce — consider a REST-based integration instead.
-
Your Salesforce organisation does not support or has not enabled the Pub/Sub API.
Installation
The Salesforce PubSub API Source Connector is hosted in a private repository. To obtain the library, contact the Axual Support team.
For installation steps, see Installing Connector Plugins.
Configuration
Configuration options are grouped as follows:
-
Authentication — Salesforce OAuth 2.0 credentials and Pub/Sub API endpoint
-
Topic Routing and Replay — Salesforce topics to subscribe to, Kafka topic naming, and replay position
-
Performance Tuning — Internal buffers, batch sizes, and polling timeouts
-
Connection Reliability (gRPC) — gRPC keep-alive settings
-
Advanced Configuration — Schema caching and connection validation
Authentication
Required settings to authenticate against Salesforce and connect to the Pub/Sub API using OAuth 2.0.
| Key | Type | Mandatory | Default | Description |
|---|---|---|---|---|
|
String |
Yes |
— |
Salesforce OAuth Client ID (Connected App Consumer Key). |
|
Password |
Yes |
— |
Salesforce OAuth Client Secret (Connected App Consumer Secret). |
|
String |
Yes |
— |
Salesforce Instance URL, e.g. |
|
String |
Yes |
— |
Salesforce Organisation ID (Tenant ID). Typically starts with |
|
String |
No |
api.pubsub.salesforce.com:7443 |
Salesforce Pub/Sub API gRPC endpoint in |
Topic Routing and Replay
Determines which Salesforce data is ingested, how Kafka topic names are generated, and where to start reading.
| Key | Type | Mandatory | Default | Description |
|---|---|---|---|---|
|
List |
Yes |
— |
Comma-separated list of Salesforce topics to subscribe to.
Example: |
|
String |
Yes |
— |
Prefix for destination Kafka topic names.
Salesforce topic path slashes are replaced with dots and this prefix is prepended.
Example: prefix |
|
String |
No |
EARLIEST |
Starting position for new subscriptions (when no stored offset exists).
|
Performance Tuning
| Key | Type | Mandatory | Default | Description |
|---|---|---|---|---|
|
Integer |
No |
100 |
Maximum records returned per |
|
Integer |
No |
1000 |
Maximum events buffered in memory before pausing the gRPC consumer. Increase for high-volume topics; decrease for memory-constrained environments. |
|
Long |
No |
1000 |
Maximum time in milliseconds to wait for events during a |
Connection Reliability (gRPC)
Controls gRPC channel keep-alive behaviour to prevent firewalls or load balancers from dropping idle connections.
| Key | Type | Mandatory | Default | Description |
|---|---|---|---|---|
|
Integer |
No |
30 |
Interval in seconds between keep-alive pings sent to the server. |
|
Integer |
No |
10 |
Time in seconds to wait for a keep-alive ping response before treating the connection as dead. |
|
Integer |
No |
5 |
Time in seconds to wait for graceful channel shutdown before forcing termination. |
Advanced Configuration
| Key | Type | Mandatory | Default | Description |
|---|---|---|---|---|
|
Integer |
No |
100 |
Maximum number of Avro schemas to cache in memory. Increase if the environment uses a large number of distinct schema versions. |
|
Boolean |
No |
true |
Whether to validate Salesforce connectivity during connector startup.
Set to |
Getting Started
This section walks you through configuring the Salesforce PubSub API Source Connector on Axual to stream events from Salesforce into a local Kafka stream.
Prerequisites
Salesforce Connected App
The connector authenticates to Salesforce using OAuth 2.0 with a Connected App (client ID and client secret).
-
In Salesforce Setup, create a Connected App with OAuth enabled.
-
Enable Use digital signatures and upload a certificate (the connector signs JWTs with the corresponding private key).
-
Grant the connected app the
apiandcdpqueryOAuth scopes (or as required by your event type). -
Note down the Consumer Key (
salesforce.client.id) and Consumer Secret (salesforce.client.secret). -
Note down your Salesforce Instance URL and Organisation ID (starts with
00D).
For detailed Salesforce Connected App setup, see the Salesforce Connected Apps documentation.
Salesforce event type
The Salesforce topic you want to stream must be enabled and accessible.
-
Platform Events: must be defined in Salesforce Setup → Platform Events.
-
Change Data Capture (CDC): must be enabled for the target object in Salesforce Setup → Change Data Capture.
-
Real-Time Event Monitoring: must be licensed and enabled in your Salesforce org.
The connector assigns one task per Salesforce topic. Ensure tasks.max ≥ the number of topics in salesforce.topics.
|
Axual stream
The local stream where the connector will produce events must already exist in Axual Self-Service.
The stream name will be derived from the Salesforce topic path and the configured topic.prefix.
See Creating streams if you need to create it.
Steps
Step 1 — Create a connector application
-
In Axual Self-Service, go to Applications and create a new application.
-
Request Producer access to the Axual stream the connector will write to.
-
Wait for the stream access request to be approved.
See Configure and install a connector for detailed steps.
Step 2 — Configure the connector
In Axual Self-Service, open the application and add a new connector. Use the following minimal configuration as a starting point for a single Platform Event topic. Replace all placeholder values with those of your environment.
| Property | Value |
|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Store salesforce.client.id and salesforce.client.secret in Vault before starting the connector.
See Configure and install a connector for instructions on adding Vault secrets.
|
For all available properties, see the Configuration section above.
Step 3 — Start the connector
Start the connector application from Axual Self-Service.
Once running, events from the configured Salesforce topic will be produced to the Axual stream named <topic.prefix><salesforce-topic-path> (slashes replaced with dots).
For example, topic prefix salesforce. and Salesforce topic /event/MyEvente produces to salesforce.event.MyEvente.
Known limitations
-
One connector task is assigned per Salesforce topic —
tasks.maxmust be ≥ the number of topics insalesforce.topics. -
Only OAuth 2.0 with Connected App client ID and client secret is supported — other authentication flows are not supported.
-
The Salesforce Pub/Sub API must be enabled in your Salesforce organisation.
-
Event replay is subject to Salesforce retention windows — typically 3 days for Platform Events and CDC events.
-
Real-Time Event Monitoring requires a separate Salesforce license and must be enabled in your org.
Examples
The connector assigns one task per Salesforce topic. Ensure tasks.max ≥ the number of topics in salesforce.topics.
|
Example 1 — Basic single topic configuration
A basic configuration for a single Platform Event topic using default performance settings.
{
"connector.class": "io.axual.connect.plugins.salesforce.SalesforcePubSubSourceConnector",
"tasks.max": "1",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": true,
"salesforce.client.id": "3MVG94DAZekw5HctTg8hdQ3FK6...",
"salesforce.client.secret": "AF97355FC3615CF9FA4EB7BF...",
"salesforce.instance.url": "https://your-domain.my.salesforce.com",
"salesforce.tenant.id": "00Dd100000APUJi",
"salesforce.topics": "/event/MyEvent__e",
"topic.prefix": "salesforce."
}
Example 2 — Multi-topic high throughput
Ingests three CDC streams with tasks.max set to 3.
Batch size and queue capacity are increased to handle bursty traffic.
{
"connector.class": "io.axual.connect.plugins.salesforce.SalesforcePubSubSourceConnector",
"tasks.max": "3",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": true,
"salesforce.client.id": "3MVG94DAZekw5HctTg8hdQ3FK6...",
"salesforce.client.secret": "AF97355FC3615CF9FA4EB7BF...",
"salesforce.instance.url": "https://your-domain.my.salesforce.com",
"salesforce.tenant.id": "00Dd100000APUJi",
"salesforce.topics": "/data/AccountChangeEvent,/data/ContactChangeEvent,/data/OpportunityChangeEvent",
"topic.prefix": "salesforce.",
"salesforce.batch.size": "2000",
"salesforce.queue.capacity": "10000",
"salesforce.poll.timeout.ms": "5000",
"salesforce.replay.preset": "LATEST"
}
Example 3 — Schema-heavy environment
Increases salesforce.schema.cache.size to 500 for environments with many distinct schema versions,
reducing repeated schema parsing overhead.
{
"connector.class": "io.axual.connect.plugins.salesforce.SalesforcePubSubSourceConnector",
"tasks.max": "2",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": true,
"salesforce.client.id": "3MVG94DAZekw5HctTg8hdQ3FK6...",
"salesforce.client.secret": "AF97355FC3615CF9FA4EB7BF...",
"salesforce.instance.url": "https://your-domain.my.salesforce.com",
"salesforce.tenant.id": "00Dd100000APUJi",
"salesforce.topics": "/data/CaseChangeEvent,/data/AssetChangeEvent",
"topic.prefix": "salesforce.",
"salesforce.schema.cache.size": "500",
"salesforce.batch.size": "500",
"errors.tolerance": "all",
"errors.log.enable": "true"
}
Example 4 — Unstable network / firewall configuration
Configures gRPC keep-alive pings every 30 seconds to prevent load balancers from killing idle connections. Connection validation is explicitly enabled to fail fast on startup if Salesforce is unreachable.
{
"connector.class": "io.axual.connect.plugins.salesforce.SalesforcePubSubSourceConnector",
"tasks.max": "1",
"salesforce.client.id": "3MVG94DAZekw5HctTg8hdQ3FK6...",
"salesforce.client.secret": "AF97355FC3615CF9FA4EB7BF...",
"salesforce.instance.url": "https://your-domain.my.salesforce.com",
"salesforce.tenant.id": "00Dd100000APUJi",
"salesforce.topics": "/event/CriticalSysStatus__e",
"topic.prefix": "salesforce.",
"salesforce.validate.connection": "true",
"salesforce.grpc.keepalive.time.seconds": "30",
"salesforce.grpc.keepalive.timeout.seconds": "10",
"salesforce.grpc.shutdown.timeout.seconds": "2"
}
Example 5 — Historical replay (earliest)
Sets salesforce.replay.preset to EARLIEST to read all available historical events from the start of the
Salesforce retention window.
{
"connector.class": "io.axual.connect.plugins.salesforce.SalesforcePubSubSourceConnector",
"tasks.max": "1",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "true",
"salesforce.client.id": "3MVG94DAZekw5HctTg8hdQ3FK6...",
"salesforce.client.secret": "AF97355FC3615CF9FA4EB7BF...",
"salesforce.instance.url": "https://your-domain.my.salesforce.com",
"salesforce.tenant.id": "00Dd100000APUJi",
"salesforce.topics": "/event/AuditLog__e",
"topic.prefix": "salesforce.",
"salesforce.replay.preset": "EARLIEST"
}