Salesforce PubSub API Source Connector

Type

Source

Class

io.axual.connect.plugins.salesforce.SalesforcePubSubSourceConnector

Target System

Messaging & Streaming (Salesforce Pub/Sub API)

Maintainer

Axual

License

Proprietary (client-only)

Project

Source Code

Download

Contact Axual Support to obtain the connector library (private repository).

Description

The Salesforce PubSub API Source Connector streams events from Salesforce to Kafka topics in real time. It uses the modern gRPC-based Salesforce Pub/Sub API, providing efficient, high-performance ingestion of Change Data Capture (CDC) events, Platform Events, and Real-Time Event Monitoring data. It is a robust alternative to older Streaming API (CometD) based connectors, offering better scalability and a unified eventing interface.

Features

  • Stream real-time events from Salesforce to Kafka

  • Supports multiple Salesforce event types:

    • Change Data Capture (CDC)

    • Platform Events

    • Real-Time Event Monitoring

  • Built on Salesforce Pub/Sub API (gRPC) for high throughput and low latency

  • Converts Salesforce Avro schemas to Kafka Connect schemas, enabling structured converters

  • Supports at-least-once delivery semantics

  • Secure authentication using OAuth 2.0 with Connected App client ID and client secret

  • One task per Salesforce topic — tasks.max must be ≥ the number of topics in salesforce.topics

When to Use

  • You need real-time streaming of Salesforce CDC events, Platform Events, or Real-Time Event Monitoring data into Kafka.

  • You require a high-throughput, low-latency integration with Salesforce using the modern Pub/Sub API.

  • You are replacing a CometD-based Salesforce connector and need better scalability.

When NOT to Use

  • You only need periodic batch exports from Salesforce — consider a REST-based integration instead.

  • Your Salesforce organisation does not support or has not enabled the Pub/Sub API.

Installation

The Salesforce PubSub API Source Connector is hosted in a private repository. To obtain the library, contact the Axual Support team.

For installation steps, see Installing Connector Plugins.

Configuration

Configuration options are grouped as follows:

Authentication

Required settings to authenticate against Salesforce and connect to the Pub/Sub API using OAuth 2.0.

Key Type Mandatory Default Description

salesforce.client.id

String

Yes

Salesforce OAuth Client ID (Connected App Consumer Key).

salesforce.client.secret

Password

Yes

Salesforce OAuth Client Secret (Connected App Consumer Secret).

salesforce.instance.url

String

Yes

Salesforce Instance URL, e.g. https://your-domain.my.salesforce.com.

salesforce.tenant.id

String

Yes

Salesforce Organisation ID (Tenant ID). Typically starts with 00D.

salesforce.pubsub.endpoint

String

No

api.pubsub.salesforce.com:7443

Salesforce Pub/Sub API gRPC endpoint in host:port format.

Topic Routing and Replay

Determines which Salesforce data is ingested, how Kafka topic names are generated, and where to start reading.

Key Type Mandatory Default Description

salesforce.topics

List

Yes

Comma-separated list of Salesforce topics to subscribe to. Example: /event/MyEvent__e,/data/Account. The number of topics must be ≤ tasks.max.

topic.prefix

String

Yes

Prefix for destination Kafka topic names. Salesforce topic path slashes are replaced with dots and this prefix is prepended. Example: prefix salesforce. + source /event/MyEvente → Kafka topic salesforce.event.MyEvente.

salesforce.replay.preset

String

No

EARLIEST

Starting position for new subscriptions (when no stored offset exists). EARLIEST: read from the beginning of the retained event stream. LATEST: start from the current tip, skipping historical events.

Performance Tuning

Key Type Mandatory Default Description

salesforce.batch.size

Integer

No

100

Maximum records returned per poll() call. Salesforce supports a maximum of 10,000 events per request.

salesforce.queue.capacity

Integer

No

1000

Maximum events buffered in memory before pausing the gRPC consumer. Increase for high-volume topics; decrease for memory-constrained environments.

salesforce.poll.timeout.ms

Long

No

1000

Maximum time in milliseconds to wait for events during a poll() operation.

Connection Reliability (gRPC)

Controls gRPC channel keep-alive behaviour to prevent firewalls or load balancers from dropping idle connections.

Key Type Mandatory Default Description

salesforce.grpc.keepalive.time.seconds

Integer

No

30

Interval in seconds between keep-alive pings sent to the server.

salesforce.grpc.keepalive.timeout.seconds

Integer

No

10

Time in seconds to wait for a keep-alive ping response before treating the connection as dead.

salesforce.grpc.shutdown.timeout.seconds

Integer

No

5

Time in seconds to wait for graceful channel shutdown before forcing termination.

Advanced Configuration

Key Type Mandatory Default Description

salesforce.schema.cache.size

Integer

No

100

Maximum number of Avro schemas to cache in memory. Increase if the environment uses a large number of distinct schema versions.

salesforce.validate.connection

Boolean

No

true

Whether to validate Salesforce connectivity during connector startup. Set to false to skip this check if Salesforce is not reachable at startup.

Getting Started

This section walks you through configuring the Salesforce PubSub API Source Connector on Axual to stream events from Salesforce into a local Kafka stream.

Prerequisites

Salesforce Connected App

The connector authenticates to Salesforce using OAuth 2.0 with a Connected App (client ID and client secret).

  1. In Salesforce Setup, create a Connected App with OAuth enabled.

  2. Enable Use digital signatures and upload a certificate (the connector signs JWTs with the corresponding private key).

  3. Grant the connected app the api and cdpquery OAuth scopes (or as required by your event type).

  4. Note down the Consumer Key (salesforce.client.id) and Consumer Secret (salesforce.client.secret).

  5. Note down your Salesforce Instance URL and Organisation ID (starts with 00D).

For detailed Salesforce Connected App setup, see the Salesforce Connected Apps documentation.

Salesforce event type

The Salesforce topic you want to stream must be enabled and accessible.

  • Platform Events: must be defined in Salesforce Setup → Platform Events.

  • Change Data Capture (CDC): must be enabled for the target object in Salesforce Setup → Change Data Capture.

  • Real-Time Event Monitoring: must be licensed and enabled in your Salesforce org.

The connector assigns one task per Salesforce topic. Ensure tasks.max ≥ the number of topics in salesforce.topics.

Axual stream

The local stream where the connector will produce events must already exist in Axual Self-Service. The stream name will be derived from the Salesforce topic path and the configured topic.prefix. See Creating streams if you need to create it.

Steps

Step 1 — Create a connector application

  1. In Axual Self-Service, go to Applications and create a new application.

  2. Request Producer access to the Axual stream the connector will write to.

  3. Wait for the stream access request to be approved.

See Configure and install a connector for detailed steps.

Step 2 — Configure the connector

In Axual Self-Service, open the application and add a new connector. Use the following minimal configuration as a starting point for a single Platform Event topic. Replace all placeholder values with those of your environment.

Property Value

connector.class

io.axual.connect.plugins.salesforce.SalesforcePubSubSourceConnector

tasks.max

1 (increase to match the number of topics in salesforce.topics)

salesforce.client.id

${keyvault:connectors/<tenant>/<instance>/<env>/<app>:salesforce.client.id}

salesforce.client.secret

${keyvault:connectors/<tenant>/<instance>/<env>/<app>:salesforce.client.secret}

salesforce.instance.url

https://your-domain.my.salesforce.com

salesforce.tenant.id

00D…​ (your Salesforce Organisation ID)

salesforce.topics

/event/MyEvent__e

topic.prefix

salesforce.

key.converter

org.apache.kafka.connect.storage.StringConverter

value.converter

org.apache.kafka.connect.json.JsonConverter

value.converter.schemas.enable

true

Store salesforce.client.id and salesforce.client.secret in Vault before starting the connector. See Configure and install a connector for instructions on adding Vault secrets.

For all available properties, see the Configuration section above.

Step 3 — Start the connector

Start the connector application from Axual Self-Service. Once running, events from the configured Salesforce topic will be produced to the Axual stream named <topic.prefix><salesforce-topic-path> (slashes replaced with dots).

For example, topic prefix salesforce. and Salesforce topic /event/MyEvente produces to salesforce.event.MyEvente.

Step 4 — Verify

In Axual Self-Service, use the stream-browse feature on the target stream to confirm that Salesforce events are arriving.

Cleanup

When you are done testing:

  1. Stop the connector application in Axual Self-Service.

  2. Remove stream access for the application if it is no longer needed.

Known limitations

  • One connector task is assigned per Salesforce topic — tasks.max must be ≥ the number of topics in salesforce.topics.

  • Only OAuth 2.0 with Connected App client ID and client secret is supported — other authentication flows are not supported.

  • The Salesforce Pub/Sub API must be enabled in your Salesforce organisation.

  • Event replay is subject to Salesforce retention windows — typically 3 days for Platform Events and CDC events.

  • Real-Time Event Monitoring requires a separate Salesforce license and must be enabled in your org.

Examples

The connector assigns one task per Salesforce topic. Ensure tasks.max ≥ the number of topics in salesforce.topics.

Example 1 — Basic single topic configuration

A basic configuration for a single Platform Event topic using default performance settings.

{
  "connector.class": "io.axual.connect.plugins.salesforce.SalesforcePubSubSourceConnector",
  "tasks.max": "1",
  "key.converter": "org.apache.kafka.connect.storage.StringConverter",
  "value.converter": "org.apache.kafka.connect.json.JsonConverter",
  "value.converter.schemas.enable": true,
  "salesforce.client.id": "3MVG94DAZekw5HctTg8hdQ3FK6...",
  "salesforce.client.secret": "AF97355FC3615CF9FA4EB7BF...",
  "salesforce.instance.url": "https://your-domain.my.salesforce.com",
  "salesforce.tenant.id": "00Dd100000APUJi",
  "salesforce.topics": "/event/MyEvent__e",
  "topic.prefix": "salesforce."
}

Example 2 — Multi-topic high throughput

Ingests three CDC streams with tasks.max set to 3. Batch size and queue capacity are increased to handle bursty traffic.

{
  "connector.class": "io.axual.connect.plugins.salesforce.SalesforcePubSubSourceConnector",
  "tasks.max": "3",
  "key.converter": "org.apache.kafka.connect.storage.StringConverter",
  "value.converter": "org.apache.kafka.connect.json.JsonConverter",
  "value.converter.schemas.enable": true,
  "salesforce.client.id": "3MVG94DAZekw5HctTg8hdQ3FK6...",
  "salesforce.client.secret": "AF97355FC3615CF9FA4EB7BF...",
  "salesforce.instance.url": "https://your-domain.my.salesforce.com",
  "salesforce.tenant.id": "00Dd100000APUJi",
  "salesforce.topics": "/data/AccountChangeEvent,/data/ContactChangeEvent,/data/OpportunityChangeEvent",
  "topic.prefix": "salesforce.",
  "salesforce.batch.size": "2000",
  "salesforce.queue.capacity": "10000",
  "salesforce.poll.timeout.ms": "5000",
  "salesforce.replay.preset": "LATEST"
}

Example 3 — Schema-heavy environment

Increases salesforce.schema.cache.size to 500 for environments with many distinct schema versions, reducing repeated schema parsing overhead.

{
  "connector.class": "io.axual.connect.plugins.salesforce.SalesforcePubSubSourceConnector",
  "tasks.max": "2",
  "key.converter": "org.apache.kafka.connect.storage.StringConverter",
  "value.converter": "org.apache.kafka.connect.json.JsonConverter",
  "value.converter.schemas.enable": true,
  "salesforce.client.id": "3MVG94DAZekw5HctTg8hdQ3FK6...",
  "salesforce.client.secret": "AF97355FC3615CF9FA4EB7BF...",
  "salesforce.instance.url": "https://your-domain.my.salesforce.com",
  "salesforce.tenant.id": "00Dd100000APUJi",
  "salesforce.topics": "/data/CaseChangeEvent,/data/AssetChangeEvent",
  "topic.prefix": "salesforce.",
  "salesforce.schema.cache.size": "500",
  "salesforce.batch.size": "500",
  "errors.tolerance": "all",
  "errors.log.enable": "true"
}

Example 4 — Unstable network / firewall configuration

Configures gRPC keep-alive pings every 30 seconds to prevent load balancers from killing idle connections. Connection validation is explicitly enabled to fail fast on startup if Salesforce is unreachable.

{
  "connector.class": "io.axual.connect.plugins.salesforce.SalesforcePubSubSourceConnector",
  "tasks.max": "1",
  "salesforce.client.id": "3MVG94DAZekw5HctTg8hdQ3FK6...",
  "salesforce.client.secret": "AF97355FC3615CF9FA4EB7BF...",
  "salesforce.instance.url": "https://your-domain.my.salesforce.com",
  "salesforce.tenant.id": "00Dd100000APUJi",
  "salesforce.topics": "/event/CriticalSysStatus__e",
  "topic.prefix": "salesforce.",
  "salesforce.validate.connection": "true",
  "salesforce.grpc.keepalive.time.seconds": "30",
  "salesforce.grpc.keepalive.timeout.seconds": "10",
  "salesforce.grpc.shutdown.timeout.seconds": "2"
}

Example 5 — Historical replay (earliest)

Sets salesforce.replay.preset to EARLIEST to read all available historical events from the start of the Salesforce retention window.

{
  "connector.class": "io.axual.connect.plugins.salesforce.SalesforcePubSubSourceConnector",
  "tasks.max": "1",
  "key.converter": "org.apache.kafka.connect.storage.StringConverter",
  "value.converter": "org.apache.kafka.connect.json.JsonConverter",
  "value.converter.schemas.enable": "true",
  "salesforce.client.id": "3MVG94DAZekw5HctTg8hdQ3FK6...",
  "salesforce.client.secret": "AF97355FC3615CF9FA4EB7BF...",
  "salesforce.instance.url": "https://your-domain.my.salesforce.com",
  "salesforce.tenant.id": "00Dd100000APUJi",
  "salesforce.topics": "/event/AuditLog__e",
  "topic.prefix": "salesforce.",
  "salesforce.replay.preset": "EARLIEST"
}

License

This connector is licensed under a proprietary client-only license. It is provided exclusively to authorized clients and may not be redistributed, modified, or used outside of the terms agreed with Axual.