Unresolved include directive in modules/ROOT/pages/_attributes.adoc - include::../_attributes.adoc[]

Example configurations for the Salesforce PubSub API Source Connector

Examples

The Salesforce PubSub API Source Connector assigns one task per Salesforce topic. When configuring the connector, ensure that tasks.max is greater than or equal to the number of topics specified in salesforce.topics.

Example 1 - Basic Single Topic Configuration

A basic configuration for a single Platform Event topic. This setup uses standard converters and default performance settings.

{
  "connector.class": "io.axual.connect.plugins.salesforce.SalesforcePubSubSourceConnector",
  "tasks.max": "1",
  "key.converter": "org.apache.kafka.connect.storage.StringConverter",
  "value.converter": "org.apache.kafka.connect.json.JsonConverter",
  "value.converter.schemas.enable": true,
  "salesforce.client.id": "3MVG94DAZekw5HctTg8hdQ3FK6...",
  "salesforce.client.secret": "AF97355FC3615CF9FA4EB7BF...",
  "salesforce.instance.url": "https://your-domain.my.salesforce.com",
  "salesforce.tenant.id": "00Dd100000APUJi",
  "salesforce.topics": "/event/MyEvent__e",
  "topic.prefix": "salesforce."
}

Example 2 - Multi-Topic High Throughput

A configuration designed for ingesting multiple high-volume Change Data Capture (CDC) streams.

  • Tasks: set to 3 to match the 3 requested topics.

  • Batching & Queueing: batch.size and queue.capacity are increased to reduce network overhead and handle bursty traffic.

  • Poll Timeout: Increased to allow the connector to fill larger batches before returning.

{
  "connector.class": "io.axual.connect.plugins.salesforce.SalesforcePubSubSourceConnector",
  "tasks.max": "3",
  "key.converter": "org.apache.kafka.connect.storage.StringConverter",
  "value.converter": "org.apache.kafka.connect.json.JsonConverter",
  "value.converter.schemas.enable": true,
  "salesforce.client.id": "3MVG94DAZekw5HctTg8hdQ3FK6...",
  "salesforce.client.secret": "AF97355FC3615CF9FA4EB7BF...",
  "salesforce.instance.url": "https://your-domain.my.salesforce.com",
  "salesforce.tenant.id": "00Dd100000APUJi",
  "salesforce.topics": "/data/AccountChangeEvent,/data/ContactChangeEvent,/data/OpportunityChangeEvent",
  "topic.prefix": "salesforce.",
  "salesforce.batch.size": "2000",
  "salesforce.queue.capacity": "10000",
  "salesforce.poll.timeout.ms": "5000",
  "salesforce.replay.preset": "LATEST"
}

Example 3 - Schema Heavy Environment

This configuration is optimized for environments with frequently changing schemas or complex object graphs.

  • Schema Cache: The salesforce.schema.cache.size is increased from the default (100) to 500.

  • Impact: In a CDC scenario where Salesforce objects have many schema versions, a small cache results in frequent parsing of previous schemas which has been removed from the cache, slowing down processing. Increasing this value trades a small amount of memory for reduced latency.

{
  "connector.class": "io.axual.connect.plugins.salesforce.SalesforcePubSubSourceConnector",
  "tasks.max": "2",
  "key.converter": "org.apache.kafka.connect.storage.StringConverter",
  "value.converter": "org.apache.kafka.connect.json.JsonConverter",
  "value.converter.schemas.enable": true,
  "salesforce.client.id": "3MVG94DAZekw5HctTg8hdQ3FK6...",
  "salesforce.client.secret": "AF97355FC3615CF9FA4EB7BF...",
  "salesforce.instance.url": "https://your-domain.my.salesforce.com",
  "salesforce.tenant.id": "00Dd100000APUJi",
  "salesforce.topics": "/data/CaseChangeEvent,/data/AssetChangeEvent",
  "topic.prefix": "salesforce.",
  "salesforce.schema.cache.size": "500",
  "salesforce.batch.size": "500",
  "errors.tolerance": "all",
  "errors.log.enable": "true"
}

Example 4 - Unstable Network / Firewall Configuration

A configuration tuned for environments with aggressive firewalls or unstable network connections.

  • Connection Validation: explicitly enabled to fail fast if the connector cannot authenticate at startup.

  • gRPC Keepalive: Tuned to send pings every 30 seconds (grpc.keepalive.time.seconds) to prevent load balancers from killing idle connections.

  • Shutdown Timeout: Reduced to allow faster connector restarts during rebalancing.

{
  "connector.class": "io.axual.connect.plugins.salesforce.SalesforcePubSubSourceConnector",
  "tasks.max": "1",
  "salesforce.client.id": "3MVG94DAZekw5HctTg8hdQ3FK6...",
  "salesforce.client.secret": "AF97355FC3615CF9FA4EB7BF...",
  "salesforce.instance.url": "https://your-domain.my.salesforce.com",
  "salesforce.tenant.id": "00Dd100000APUJi",
  "salesforce.topics": "/event/CriticalSysStatus__e",
  "topic.prefix": "salesforce.",
  "salesforce.validate.connection": "true",
  "salesforce.grpc.keepalive.time.seconds": "30",
  "salesforce.grpc.keepalive.timeout.seconds": "10",
  "salesforce.grpc.shutdown.timeout.seconds": "2"
}

Example 5 - Historical Replay (Earliest)

A configuration for re-processing data from the beginning of the Salesforce retention window.

  • Replay Preset: Set to EARLIEST to ignore the current "tip" of the stream and read all available history.

  • Converters: Uses JSON converters with schemas enabled, often used for debugging historical data flows.

{
  "connector.class": "io.axual.connect.plugins.salesforce.SalesforcePubSubSourceConnector",
  "tasks.max": "1",
  "key.converter": "org.apache.kafka.connect.storage.StringConverter",
  "value.converter": "org.apache.kafka.connect.json.JsonConverter",
  "value.converter.schemas.enable": "true",
  "salesforce.client.id": "3MVG94DAZekw5HctTg8hdQ3FK6...",
  "salesforce.client.secret": "AF97355FC3615CF9FA4EB7BF...",
  "salesforce.instance.url": "https://your-domain.my.salesforce.com",
  "salesforce.tenant.id": "00Dd100000APUJi",
  "salesforce.topics": "/event/AuditLog__e",
  "topic.prefix": "salesforce.",
  "salesforce.replay.preset": "EARLIEST"
}