Google Pub/Sub Lite source and sink connectors

Google Pub/Sub Lite

The CloudPubSubConnector is a connector to be used with Kafka Connect to publish messages from Kafka to Google Cloud Pub/Sub or Pub/Sub Lite and vice versa.

If you already have a publicly accessible deployment of Google Pub/Sub Lite, you can already configure and install a connector using the Axual Self-Service UI. Use the configuration instructions below.

If you don’t have one available, follow the next section to deploy a publicly available instance of Google Pub/Sub Lite.

The PubSub Lite connector requires authentication against Google. This authentication is not done at the connector-plugin level, but at the Connect-Cluster level.
This means there can only be one PubSub Lite connector per cluster.
For the first connector of this kind, we’ll use the environment variable approach.
If more such connectors should be started within the same Connect-Cluster, axual developers will have to look into this workaround.

Deploying a Google Pub/Sub Lite instance

We’ll deploy a Google Pub/Sub Lite instance using Google Cloud Services.

If you already have a Google account, you can sign up for a free trial of Google cloud here.

You can read more about Google Pub/Sub Lite on Google cloud here.

Let’s get started.

  1. Create a service account here (You might get prompted to select your Google project).
    You can reuse the service account you created for Google Pub/Sub, if you have it.

    • At the top of the page, click +CREATE SERVICE ACCOUNT. A new page will open. Fill in the following:

      • Service account name: my-pubsub-admin

      • Service account ID: my-pubsub-admin

      • Service account description: "Temporary admin account to manage Google Pub/Sub"

    • Click CREATE AND CONTINUE

    • We’ll grant this ServiceAccount access to the project, and specifically two roles.

      • Click the dropdown called "Select a role" and, in the filter box, type Pub/Sub Lite Admin. Click "Pub/Sub Lite Admin".

      • Click +ADD ANOTHER ROLE. Repeat the previous step with Pub/Sub Admin as a filter.

    • We are not granting users access to this service account, so click Done instead.

  2. Click the newly created service account. A new page will open.

  3. Click the KEYS tab.

  4. Click the ADD KEY button and select "Create a new key". Use "JSON" Key type and click CREATE.
    A key will be created and automatically downloaded on your machine. We’ll refer to it later as "the downloaded key-file".
    You can now close the "Service accounts" tab.

  5. Enable Google Pub/Sub Lite for your project by visiting this page.

  6. At the top of the page, click + CREATE LITE RESERVATION

    • Location: Choose europe-west1 (Belgium)

    • Click CONTINUE

    • Name: my-pubsub-lite

    • Click CONTINUE

    • Throughput: 1

    • Click CONTINUE

    • Review: Just click CREATE
      Your new lite reservation page will show up.

  7. At the bottom of the page, click + CREATE LITE TOPIC

    • Location: Select Zonal Lite Topic and select europe-west1-b from the dropdown

    • Click CONTINUE

    • Name: my_pubsub_lite_google_topic

    • Click CONTINUE

    • Throughput:

      • Peak Publish Throughput (MiB/s): 4 (default)

      • Peak Subscribe Throughput (MiB/s): 4 (default)

    • Click CONTINUE

    • Message storage:

      • Storage per partition (GiB): 30 (default, minimum)

      • Retention period: Drop messages after specified duration even if storage is available

        • Days: 1

    • Click CONTINUE

    • Review: Just click CREATE

  8. The topic page will open now.
    The first listed property is Lite topic name. This property value begins with: projects/123456789012/…​
    Note down the number (e.g. 123456789012). We’ll call it "the project number". We’ll use it when configuring the connector-application, as the pubsublite.project configuration value.

  9. While still on the lite topic page, click CREATE LITE SUBSCRIPTION:

    • Lite subscription ID: my_pubsub_lite_google_topic-sub

    • Delivery requirement (required): Deliver messages immediately (default)

    • Starting Offset (required): Beginning

    • Click CREATE

  10. We’ll now prepare to publish a message to the topic using Google Cloud shell.
    At the top-right of your screen, next to your picture, are a few buttons.
    One of them is "Activate Cloud Shell". Click it.
    A shell will open at the bottom of your screen. Copy-Paste the following code-block:

    pip3 install --upgrade google-cloud-pubsublite
    wget -O ~/produce-to-lite-topic.py https://raw.githubusercontent.com/googleapis/python-pubsublite/v1.4.2/samples/snippets/publisher_example.py
    export GOOGLE_APPLICATION_CREDENTIALS=~/keys.json
    cat << } > ~/keys.json
  11. Now Paste the contents of your downloaded key file.

  12. Paste this command

    echo '}' >> ~/keys.json
  13. Produce to the topic using the following command:

    python3 ~/produce-to-lite-topic.py    \
            $(gcloud projects list | grep "$(gcloud config get-value project)" | awk '{print $(NF)}') \
            'europe-west1'                \
            'b'                           \
            'my_pubsub_lite_google_topic' \
            ''
  14. [Optional] Repeat the previous step and publish multiple messages.
    These steps were similar to this documentation page from Google.

  15. [Optional] Create a second lite topic called my_pubsub_lite_google_target. Go to your lite reservations and click on the my-pubsub-lite reservation, then follow the same steps you did before to finalize the creation of the topic.
    You don’t need to produce any events to it. This topic will be used later, when configuring a sink connector.

Configuring a new source connector

  1. Follow the 2022.3@axual::self-service/stream-management.html.adoc#creating-streams documentation in order to create one stream and deploy it onto an environment.
    The name of the stream will be my_pubsub_lite_kafka_topic.
    The key/value types will be String.

  2. Follow the Configuring Connector-Applications documentation to set up a new connector application.
    Let’s call it my_pubsub_lite_source. The plugin name is "com.google.pubsublite.kafka.source.PubSubLiteSourceConnector".
    The values you will need to supply as configuration will be listed in this section.
    Configure the security certificate as instructed.

  3. Configuring the Connector-application deployment:

    • Open the previously downloaded key-file. We’ll use values from this file as part of our configuration.

    • Provide the following minimal configuration in order to connect to the previously configured Google Pub/Sub Lite instance.
      For advanced configuration, see the official connector documentation.

      kafka.topic

      my_pubsub_lite_kafka_topic

      pubsublite.location

      europe-west1-b

      pubsublite.project

      Paste the project number you wrote down after creating the my_pubsub_lite_google_topic topic.

      pubsublite.subscription

      my_pubsub_lite_google_topic-sub

  4. Authorize the my_pubsub_lite_source Connector-Application to produce to the my_pubsub_lite_kafka_topic topic.

  5. You cannot start the Connector-Application.

    The PubSub Lite connector requires authentication against Google. This authentication is not done at the connector-plugin level, but at the Connect-Cluster level.
    This means there can only be one PubSub Lite connector per cluster.
    For the first connector of this kind, we’ll use the environment variable approach.
    If more such connectors should be started within the same Connect-Cluster, axual developers will have to look into this workaround.
  6. Contact Axual to find a solution for authenticating against google with your service account key.

  7. Start the Connector-Application

  8. You can now check the stream to see the events published by the connector. The values will be base64 encoded.

Configuring a new sink connector

  1. Follow the Configuring Connector-Applications documentation to set up a new Connector application.
    Let’s call it my_pubsub_lite_sink. The plugin name is "com.google.pubsublite.kafka.sink.PubSubLiteSinkConnector".
    The values you will need to supply as configuration will be listed in this section.
    Configure the security certificate as instructed.

  2. Configuring the Connector-application deployment:

    • Open the previously downloaded key-file. We’ll use values from this file as part of our configuration.

    • Provide the following minimal configuration in order to connect to the previously configured Google Pub/Sub Lite instance.
      For advanced configuration, see the official connector documentation.

      pubsublite.topic

      my_pubsub_lite_google_target

      pubsublite.location

      europe-west1-b

      pubsublite.project

      Paste the project number you wrote down after creating the my_pubsub_lite_google_topic topic.

  3. Authorize the my_pubsub_lite_sink Connector-Application to consume the my_pubsub_kafka_topic topic (or an alternative topic).

  4. You cannot start the Connector-Application.

    The PubSub Lite connector requires authentication against Google. This authentication is not done at the connector-plugin level, but at the Connect-Cluster level.
    This means there can only be one PubSub Lite connector per cluster.
    For the first connector of this kind, we’ll use the environment variable approach.
    If more such connectors should be started within the same Connect-Cluster, axual developers will have to look into this workaround.
  5. Contact Axual to find a solution for authenticating against google with your service account key.

  6. Start the Connector-Application

  7. You can now check the Google topic to see the events published by the connector.

Cleanup

Once you are done, stop the Connector applications and cleanup the unused axual resources.

Don’t forget to return to your Pub/Sub Lite Topic List and delete your Google Topics, as well as the my-pubsub-admin Service account. Do also check your IAM principals, see if you can remove any principal which was created as a byproduct of this example.

License

Google Pub/Sub Lite source and sink connectors are licensed under the Apache License, Version 2.0.

Source code

The source code for the connector can be found on github.com/GoogleCloudPlatform/pubsub