Google Pub/Sub source and sink connectors

Google Pub/Sub

The CloudPubSubConnector is a connector to be used with Kafka Connect to publish messages from Kafka to Google Cloud Pub/Sub or Pub/Sub Lite and vice versa.

If you already have a publicly accessible deployment of Google Pub/Sub, you can already configure and install a connector using the Axual Self-Service UI. Use the configuration instructions below.

If you don’t have one available, follow the next section to deploy a publicly available instance of Google Pub/Sub.

Deploying a Google Pub/Sub instance

We’ll deploy a Google Pub/Sub instance using Google Cloud Services.

If you already have a Google account, you can sign up for a free trial of Google cloud here.

You can read more about Google Pub/Sub on Google cloud here.

Let’s get started.

  1. Create a service account here (You might get prompted to select your Google project).

    • At the top of the page, click +CREATE SERVICE ACCOUNT. A new page will open. Fill in the following:

      • Service account name: my-pubsub-admin

      • Service account ID: my-pubsub-admin

      • Service account description: "Temporary admin account to manage Google Pub/Sub"

    • Click CREATE AND CONTINUE

    • We’ll grant this ServiceAccount access to the project, and specifically two roles.

      • Click the dropdown called "Select a role" and, in the filter box, type Pub/Sub Lite Admin. Click "Pub/Sub Lite Admin".

      • Click +ADD ANOTHER ROLE. Repeat the previous step with Pub/Sub Admin as a filter.

    • We are not granting users access to this service account, so click Done instead.

  2. Click the newly created service account. A new page will open.

  3. Click the KEYS tab.

  4. Click the ADD KEY button and select "Create a new key". Use "JSON" Key type and click CREATE.
    A key will be created and automatically downloaded on your machine. We’ll refer to it later as "the downloaded key-file".
    You can now close the "Service accounts" tab.

  5. Enable Google Pub/Sub for your project by visiting this page.

  6. At the top of the page, click + CREATE TOPIC

    • Topic ID: my_pubsub_google_topic

    • Check the "Add a default subscription" box (should be checked by default)

    • Do not check the "schema" and "message retention" boxes (should be unchecked by default)

    • Encryption: Use a "Google-managed encryption key" (should be the default option)

    • Click CREATE TOPIC
      This topic’s page will open. A subscription called my_pubsub_google_topic-sub is automatically created and listed in the subscriptions table.

  7. Click the MESSAGES tab

  8. Click PUBLISH MESSAGE

    • Number of messages: 1

    • Message body: Hello, world!

    • Click PUBLISH

  9. [Optional] Repeat the previous step and publish multiple messages.

  10. [Optional] Return to the topics page and create a second topic called my_pubsub_google_target.
    Do not produce any events to it. This topic will be used later, when configuring a sink connector.

Configuring a new source connector

  1. Follow the 2022.3@axual::self-service/stream-management.html.adoc#creating-streams documentation in order to create one stream and deploy it onto an environment.
    The name of the stream will be my_pubsub_kafka_topic.
    The key/value types will be String.

  2. Follow the Configuring Connector-Applications documentation to set up a new connector application.
    Let’s call it my_pubsub_source. The plugin name is "com.google.pubsub.kafka.source.CloudPubSubSourceConnector".
    The values you will need to supply as configuration will be listed in this section.
    Configure the security certificate as instructed.

  3. Configuring the Connector-application deployment:

    • Open the previously downloaded key-file. We’ll use values from this file as part of our configuration.

    • Provide the following minimal configuration in order to connect to the previously configured Google Pub/Sub instance.
      For advanced configuration, see the official connector documentation.

      cps.project

      Copy the project_id value from the downloaded key-file
      Example value:
      decisive-lambda-369420

      gcp.credentials.json

      Copy the entire json as-is from the downloaded key file. Newlines will be automatically excluded.

      cps.subscription

      my_pubsub_google_topic-sub

      kafka.topic

      my_pubsub_kafka_topic

      kafka.partition.scheme

      kafka_partitioner

  4. Authorize the my_pubsub_source Connector-Application to produce to the my_pubsub_kafka_topic topic.

  5. You can now start the Connector-Application.

  6. You can now check the stream to see the events published by the connector. The values will be base64 encoded.

Configuring a new sink connector

  1. Follow the Configuring Connector-Applications documentation to set up a new Connector application.
    Let’s call it my_pubsub_sink. The plugin name is "com.google.pubsub.kafka.sink.CloudPubSubSinkConnector".
    The values you will need to supply as configuration will be listed in this section.
    Configure the security certificate as instructed.

  2. Configuring the Connector-application deployment:

    • Open the previously downloaded key-file. We’ll use values from this file as part of our configuration.

    • Provide the following minimal configuration in order to connect to the previously configured Google Pub/Sub instance.
      For advanced configuration, see the official connector documentation.

      cps.project

      Copy the project_id value from the downloaded key-file
      Example value:
      decisive-lambda-369420

      gcp.credentials.json

      Copy the entire json as-is from the downloaded key file. Newlines will be automatically excluded.

      cps.topic

      my_pubsub_google_target

      topics

      my_pubsub_kafka_topic

  3. Authorize the my_pubsub_sink Connector-Application to consume the my_pubsub_kafka_topic topic.

  4. You can now start the Connector-Application.

  5. You can now check the Google topic to see the events published by the connector.

    • Open the Google topics page

    • Click the my_pubsub_google_target topic.

    • Click the MESSAGES tab

    • Click the dropdown below and select the only subscription available

    • Click PULL

    • The values will be displayed as base64 encoded strings

Cleanup

Once you are done, stop the Connector application and cleanup the unused axual resources.

Don’t forget to return to your Pub/Sub Topic List and delete your Google Topics, as well as the my-pubsub-admin Service account. Do also check your IAM principals, see if you can remove any principal which was created as a byproduct of this example.

License

Google Pub/Sub source and sink connectors are licensed under the Apache License, Version 2.0.

Source code

The source code for the connector can be found on github.com/GoogleCloudPlatform/pubsub